Change Data Capture

Mikalai Saskavets

Change Data Capture



снимаем нагрузку с базы данных, упрощаем код основного приложения

Mikalai Saskavets
Group Manager
2021

Mikalai Saskavets

Group Manager


ex. Full Stack Web Developer


• профессионально разрабоатываю под web с 2006
• пишу на Python c 2013
• рассказываю про подходы к разработке с 2018

Change Data Capture



Приложение



Приложение





и много пользователей



Приложение





и много пользователей
и много данных в большой БД



Приложение





и много пользователей
и много данных в большой БД
и сложная, запутанная бизнес логика


            один
       пользовательский
           запрос
             |
             ▼
         Приложение 
             |
             |-> 1-10 подзапросов к БД
             |-> 2-3-5 подзапросов к внешнему API
             |-> 1-2 подзапроса на обновление поискового индекса
             |-> тысячи их
        
        

Так ли плохо?

Проблемы

@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    return JsonResponse(article.as_json()) 
        
        

А что там с поиском?

А что там с поиском?



ElasticSearch, Solr, Sphinx, ...?

@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    search_engine.update(article)
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    search_engine.update(article)
    mail_sender.send(to=article.author,
                     message=__congrats(article))
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    search_engine.update(article)
    mail_sender.send(to=article.author,
                     message=__congrats(article))
    push_notifications.send(to=Users.object.all(),
                     message=__promote(article))
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    search_engine.update(article)
    mail_sender.send(to=article.author,
                     message=__congrats(article))
    push_notifications.send(to=Users.object.all(),
                     message=__promote(article))
    payment_gateway.charge(article.author,
                           __details(article))
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    __do_some_ancient_magick()
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    __do_some_ancient_magick(
        object=article,
        update_index=YES,
        send_email_notification=YES,
        send_push_notification=YES,
    )
    return JsonResponse(article.as_json()) 
        
        
@api_view(["POST"])
def mark_as_featured(request, id):
    article = Articles.objects.get(id=id)
    article.featured_start_date=now()
    article.featured_end_date=__caclulate(article)
    article.save()
    # the magick is happening somewhere under the hood
    return JsonResponse(article.as_json()) 
        
        

            
            
            
            

         ⇄  Захватчик изменений для поискового индекса
         ⇄  Захватчик изменений для пушей
         ⇄  Захватчик изменений для рассылки писем
        
        

Способы детектирования изменений в базе

Детектирование изменений через метаданные

id ... last_modified
1 ... 2021-08-25 11:12:01
2 ... 2021-08-25 11:12:23
3 ... 2021-08-25 11:12:43
4 ... 2021-08-25 11:12:54
5 ... 2021-08-25 11:12:26
6 ... 2021-08-25 11:12:41

Детектирование изменений через метаданные

id ... last_modified
1 ... 2021-08-25 11:12:01
2 ... 2021-08-25 11:12:23
3 ... 2021-08-26 19:12:43
4 ... 2021-08-25 11:12:54
5 ... 2021-08-25 11:12:26
6 ... 2021-08-25 11:12:41
7 ... 2021-08-26 19:12:55

Детектирование изменений через метаданные

id ... last_modified
1 ... 2021-08-25 11:12:01
2 ... 2021-08-25 11:12:23
3 ... 2021-08-26 19:12:43
4 ... 2021-08-25 11:12:54
 
6 ... 2021-08-25 11:12:41
7 ... 2021-08-26 19:12:55

Захват изменений через триггеры

Вешаем триггеры на операции изменения:

Захват изменений через триггеры


            
CREATE TRIGGER log_article_changes_trigger
    BEFORE UPDATE OR INSERT OR DELETE
    ON articles
    FOR EACH ROW
    EXECUTE PROCEDURE log_article_changes();
        

Захват изменений через триггеры


            
CREATE OR REPLACE FUNCTION log_article_changes()
  RETURNS TRIGGER 
  LANGUAGE PLPGSQL
  AS
$$
BEGIN
    IF NEW.featured_start_date <> OLD.featured_start_date THEN
        INSERT INTO articles_featured_log
        (
            article_id, featured_start_date, changed_on
        )
        VALUES
        (
            OLD.id, NEW.featured_start_date, now()
            /* TODO: fix for "AFTER INSERT" cases */
        );
	END IF;
	RETURN NEW;
END;
$$
        

Транзакционный лог
Журнал изменений

Транзакционный лог в случае PostgreSQL

WAL (Write Ahead Log)

WAL PostgreSQL как механизм репликации

Схема использования WAL для репликации PostgreSQL

Схема использования WAL для репликации PostgreSQL

Debezium как вариант реализации CDC для PostgreSQL

Debezium как вариант реализации CDC

Debezium без Kafka (экспериментальный режим!)

Встраивание Debezium в ваше Java приложение

Используя Embedding Debezium Connectors / Debezium Engine API можно отказаться от слоя с очередью

Встраивание Debezium в ваше Java приложение

Используя Embedding Debezium Connectors / Debezium Engine API можно отказаться от слоя с очередью

PgSync: синхронизация PostgreSQL и Elasticsearch

https://github.com/topics/change-data-capture

Supabase Realtime: изменения из PostgreSQL в вебсокеты

Огромное количество возможностей

Минусы реализации CDC через WAL механизмы

Debezium как вариант реализации CDC

Плюсы и минусы CDC

Минусы:

Плюсы и минусы CDC

Ещё минусы:

Плюсы и минусы CDC

Плюсы:

Плюсы и минусы CDC

Ещё плюсы:

Плюсы и минусы CDC

Минусы:
  • Инфраструктура и архитектура усложняются
  • Тяжело дебажить
  • Непросто "перенакатить" действия в случае выявленных ошибок
  • Нужен мониторинг
Плюсы:
  • Упрощается кодовая база основного приложения
  • Легко добавлять новые обработчики
  • Большая свобода выбора подходящий инструментов под задачу
  • Снижается нагрузка на основную БД

В каких случаях выбирать CDC подход

В каких случаях выбирать CDC подход

В каких случаях выбирать CDC подход

В каких случаях выбирать CDC подход

Итого

Вопросы?