change data capture service
cdc_notifier в текущей реализации не является унифицированным средством для захвата и ретрансляции изменений данных. Текущая реализация предоставляет только функциональность отслеживания обновлений и смены состояний жизненного цикла процессов, реализованных на основе progressor (https://github.com/valitydev/progressor). Захват изменений реализован на основе механизма WAL PostgreSQL. Приложение функционирует как логическая реплика для одного или нескольких экземпляров PostgreSQL. Захваченные обновления сериализуются в thrift в соответствии со спецификацией mg_proto (https://github.com/valitydev/machinegun-proto/blob/master/proto/event_sink.thrift, https://github.com/valitydev/machinegun-proto/blob/master/proto/lifecycle_sink.thrift) и передаются в Kafka.
PostgreSQL должен быть сконфигурирован с поддержкой логической репликации (wal_level=logical), должно быть сконфигурировано необходимой количество wal_senders с учётом всех потребителей (max_wal_senders >= max_replication_slots). У роли пользователя, от имени которого подключается CDC, должны быть необходимые права на работу с функциями репликации (ALTER ROLE $user WITH REPLICATION;).
Приложения-зависимости (brod, epg_connector) должны быть сконфигурированы в соответствии с собственной документацией. Пример минимально необходимоой конфигурации приведен в config/sys.config
cdc_notifier является umbrella приложением и может содержать в своём составе специфические реализации для захвата и трансляции обновлений. В текущей версии оно содержит только cdc_progressor.
Описание конфигурации cdc_progressor
[
{cdc_progressor, [
{streams, #{
%% db_ref
example_db => #{
%%%% replication slot name
"cdc_slot_example" =>
%%%%%%% publications config (kafka client, source namespace, destionation topics)
#{
%% source namespace_id = example
example => #{
%% kafka client config for publication
kafka_client => default_kafka_client,
eventsink_topic => <<"example_eventsink_topic">>,
lifecycle_topic => <<"example_lifecycle_topic">>
},
%% source namespace_id = invoice
invoice => #{
kafka_client => default_kafka_client,
eventsink_topic => <<"invoice_eventsink_topic">>,
lifecycle_topic => <<"invoice_lifecycle_topic">>
}
}
}
}},
%% optional parameters
{resend_timeout, 3000},
{max_retries, 3},
{reconnect_timeout, 5000}
]}
]
Здесь параметр streams описывает к каким БД какие слоты репликации необходимо создать (db_ref - не имя БД, а имя подключения (или иначе - ссылка на БД) в epg_connector, подробнее см. в секции databases https://github.com/valitydev/epg_connector?tab=readme-ov-file#database-and-pool-configuration). Каждому слоту соответствует свой набор неймспейсов (ключ = ID неймспейса progressor`а), а каждому неймспейсу соотвествует свой стрим: клиент kafka (задается в конфиге brod, может быть переиспользован), топик событий жизенного цикла lifecycle_topic, топик обновлений eventsink_topic (ДОЛЖНЫ быть заданы оба топика).
Также cdc_progressor имеет опциональные параметры:
- resend_timeout - таймаут повторной попытки отправки сообщения в kafka в случае неудачной попытки передачи
- max_retries - максимальное количество попыток переотправки в kafka, по достижению этого значения захват обновлений будет пристановлен (соеднинение с БД закрыто, репликация остановлена, восстановление через reconnect_timeout)
- reconnect_timeout - таймаут возобновления репликации после потери соединения или после достижения max_retries Значения по умолчанию приведены в примере выше.
- создание необходимых публикаций в БД, если еще не созданы (однократно, при первом запуске после добавления стрима в конфиг)
- создание персистентного слота репликации, если еще не создан (однократно, при первом запуске после добавления слота в конфиг)
- получение текущего LSN в PostgreSQL ("0/0" при первом запуске)
- старт репликации с соответствующим LSN
Создание слота и старт репликации осуществляется посредством epg_wal_reader:subscribe. Создаваемый слот является персистентным, что позволяет хранить подтвержденные LSN (Log Sequence Number) на стороне PostgreSQL.
- получение сообщения репликации от wal_reader
- парсинг - конвертация в сообщение kafka
- синхронная отправка сообщения в kafka
- получение подтверждения успешной отправки сообщения в kafka
- отправка подтверждения в wal_reader
- wal_reader возвращает в PostgreSQL значение обработанного LSN
- ожидание следующего сообщения репликации (в начало)
- получение сообщения репликации от wal_reader
- парсинг - конвертация в сообщение kafka
- синхронная отправка сообщения в kafka
- сбой отправки сообщения в kafka
- достижение max_retries
- остановка репликации
- ожидание reconnect_timeout
- старт репликации со значения LSN, следующего за последним подтверждённым
- ожидание сообщения репликации (в начало)
ВАЖНО: CDC полагается, что требуемые БД и таблицы существуют. Таблицы progressor создаются в ходе миграций при старте соответствующего сервиса-процессора, поэтому настройка неймспейса ОБЯЗАТЕЛЬНО должна быть выполнена ДО настройки стримов на cdc_progressor.
Основное допущение: сервисы-потребители топиков kafka корректно обрабатывают дубликаты сообщений
Порядок запуска:
- настроить параметры стриминга в cdc_progressor для требуемого неймспейса
- рестартануть cdc_notifier (с этого момента сообщения в соответствующих топиках "задваиваются")
- отключить механизмы нотификации progressor для неймспейса в соответствующем сервисе-процессоре
- рестартануть сервис-процессор (с этого момента остаётся единственный источник нотификций)
Порядок запуска:
- остановить трафик неймспейса (чтобы избежать неконсистентности данных в даунстриме)
- настроить параметры стриминга в cdc_progressor для требуемого неймспейса
- рестартануть cdc_notifier
- подать трафик нового неймспейса
Порядок запуска:
- трафик нового неймспейса не подавать до завершения настройки
- настроить неймспейс в progressor соответствующего сервиса-процессора
- рестартануть сервис-процессор
- настроить параметры стриминга для неймспейса в cdc_progressor
- рестартануть cdc_progressor
- подать трафик нового неймспейса