Skip to content

valitydev/cdc_notifier

cdc_notifier

change data capture service

Общие сведения

cdc_notifier в текущей реализации не является унифицированным средством для захвата и ретрансляции изменений данных. Текущая реализация предоставляет только функциональность отслеживания обновлений и смены состояний жизненного цикла процессов, реализованных на основе progressor (https://github.com/valitydev/progressor). Захват изменений реализован на основе механизма WAL PostgreSQL. Приложение функционирует как логическая реплика для одного или нескольких экземпляров PostgreSQL. Захваченные обновления сериализуются в thrift в соответствии со спецификацией mg_proto (https://github.com/valitydev/machinegun-proto/blob/master/proto/event_sink.thrift, https://github.com/valitydev/machinegun-proto/blob/master/proto/lifecycle_sink.thrift) и передаются в Kafka.

Конфигурирование

PostgreSQL должен быть сконфигурирован с поддержкой логической репликации (wal_level=logical), должно быть сконфигурировано необходимой количество wal_senders с учётом всех потребителей (max_wal_senders >= max_replication_slots). У роли пользователя, от имени которого подключается CDC, должны быть необходимые права на работу с функциями репликации (ALTER ROLE $user WITH REPLICATION;).

Приложения-зависимости (brod, epg_connector) должны быть сконфигурированы в соответствии с собственной документацией. Пример минимально необходимоой конфигурации приведен в config/sys.config

cdc_notifier является umbrella приложением и может содержать в своём составе специфические реализации для захвата и трансляции обновлений. В текущей версии оно содержит только cdc_progressor.

Описание конфигурации cdc_progressor

[
  {cdc_progressor, [
          {streams, #{
              %% db_ref
              example_db => #{
              %%%% replication slot name
                  "cdc_slot_example" =>
              %%%%%%% publications config (kafka client, source namespace, destionation topics)
                      #{
                          %% source namespace_id = example
                          example => #{
                              %% kafka client config for publication
                              kafka_client => default_kafka_client,
                              eventsink_topic => <<"example_eventsink_topic">>,
                              lifecycle_topic => <<"example_lifecycle_topic">>
                          },
                          %% source namespace_id = invoice
                          invoice => #{
                              kafka_client => default_kafka_client,
                              eventsink_topic => <<"invoice_eventsink_topic">>,
                              lifecycle_topic => <<"invoice_lifecycle_topic">>
                          }
                      }
              }
          }},
        %% optional parameters
        {resend_timeout, 3000},
        {max_retries, 3},
        {reconnect_timeout, 5000}
  ]}
]

Здесь параметр streams описывает к каким БД какие слоты репликации необходимо создать (db_ref - не имя БД, а имя подключения (или иначе - ссылка на БД) в epg_connector, подробнее см. в секции databases https://github.com/valitydev/epg_connector?tab=readme-ov-file#database-and-pool-configuration). Каждому слоту соответствует свой набор неймспейсов (ключ = ID неймспейса progressor`а), а каждому неймспейсу соотвествует свой стрим: клиент kafka (задается в конфиге brod, может быть переиспользован), топик событий жизенного цикла lifecycle_topic, топик обновлений eventsink_topic (ДОЛЖНЫ быть заданы оба топика).

Также cdc_progressor имеет опциональные параметры:

  • resend_timeout - таймаут повторной попытки отправки сообщения в kafka в случае неудачной попытки передачи
  • max_retries - максимальное количество попыток переотправки в kafka, по достижению этого значения захват обновлений будет пристановлен (соеднинение с БД закрыто, репликация остановлена, восстановление через reconnect_timeout)
  • reconnect_timeout - таймаут возобновления репликации после потери соединения или после достижения max_retries Значения по умолчанию приведены в примере выше.

Функционирование

Старт

  • создание необходимых публикаций в БД, если еще не созданы (однократно, при первом запуске после добавления стрима в конфиг)
  • создание персистентного слота репликации, если еще не создан (однократно, при первом запуске после добавления слота в конфиг)
  • получение текущего LSN в PostgreSQL ("0/0" при первом запуске)
  • старт репликации с соответствующим LSN

Создание слота и старт репликации осуществляется посредством epg_wal_reader:subscribe. Создаваемый слот является персистентным, что позволяет хранить подтвержденные LSN (Log Sequence Number) на стороне PostgreSQL.

Нормальный цикл работы

  • получение сообщения репликации от wal_reader
  • парсинг - конвертация в сообщение kafka
  • синхронная отправка сообщения в kafka
  • получение подтверждения успешной отправки сообщения в kafka
  • отправка подтверждения в wal_reader
  • wal_reader возвращает в PostgreSQL значение обработанного LSN
  • ожидание следующего сообщения репликации (в начало)

Сбой отправки сообщения в kafka

  • получение сообщения репликации от wal_reader
  • парсинг - конвертация в сообщение kafka
  • синхронная отправка сообщения в kafka
  • сбой отправки сообщения в kafka
  • достижение max_retries
  • остановка репликации
  • ожидание reconnect_timeout
  • старт репликации со значения LSN, следующего за последним подтверждённым
  • ожидание сообщения репликации (в начало)

Запуск стримов (деплой и эксплуатация)

ВАЖНО: CDC полагается, что требуемые БД и таблицы существуют. Таблицы progressor создаются в ходе миграций при старте соответствующего сервиса-процессора, поэтому настройка неймспейса ОБЯЗАТЕЛЬНО должна быть выполнена ДО настройки стримов на cdc_progressor.

Запуск стрима на существующем неймспейсе, в котором включены механизмы нотификации progressor

Основное допущение: сервисы-потребители топиков kafka корректно обрабатывают дубликаты сообщений

Порядок запуска:

  • настроить параметры стриминга в cdc_progressor для требуемого неймспейса
  • рестартануть cdc_notifier (с этого момента сообщения в соответствующих топиках "задваиваются")
  • отключить механизмы нотификации progressor для неймспейса в соответствующем сервисе-процессоре
  • рестартануть сервис-процессор (с этого момента остаётся единственный источник нотификций)

Запуск стрима на существующем неймспейсе с отключенными механимзмами нотификации progressor

Порядок запуска:

  • остановить трафик неймспейса (чтобы избежать неконсистентности данных в даунстриме)
  • настроить параметры стриминга в cdc_progressor для требуемого неймспейса
  • рестартануть cdc_notifier
  • подать трафик нового неймспейса

Запуск стрима на вновь создаваемом неймспейсе

Порядок запуска:

  • трафик нового неймспейса не подавать до завершения настройки
  • настроить неймспейс в progressor соответствующего сервиса-процессора
  • рестартануть сервис-процессор
  • настроить параметры стриминга для неймспейса в cdc_progressor
  • рестартануть cdc_progressor
  • подать трафик нового неймспейса

About

change data capture service

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors 2

  •  
  •