Skip to content

Conversation

@bukk530
Copy link

@bukk530 bukk530 commented Dec 11, 2025

Key Changes

Checkpoint Handling Fixes:

  • Fixed FileCheckpointStore and MongoCheckpointStore to enforce monotonic checkpoint advancement - checkpoints can no longer move backwards within a run
  • Modified Replicator.Flush() to rely on the checkpoint store's in-memory state rather than computing a checkpoint from the reader's position
  • Added ReplicationRun helper to generate a unique RunId per replication execution for log correlation

Partition Sequencing Improvements:

  • Changed PartitionChannel to use global commit position (SourceLogPosition.EventPosition) for ordering instead of per-stream SequenceNumber, fixing false sequence errors when new streams start at event number 0
  • Added IsMetadata computed property to BaseOriginalEvent and BaseProposedEvent to distinguish metadata/system events ($>, $@) from domain events
  • Added IgnoreMetadataEventsForPartitioning configuration flag to optionally bypass sequence checks for metadata events

Diagnostic Instrumentation:

  • Added ReplicationMessageId (GUID) to all event models for end-to-end correlation from reader → partitioner → Kafka writer
  • Added DebugPartitionSequences configuration flag to enable high-verbosity sequencing logs
  • Enhanced PartitionChannel with detailed logging:
    • Sequence transitions with delta values
    • Buffer dumps on sequence errors (last 32 events)
    • Last successfully written event before errors
    • Per-event Seq OK logs when debug mode is enabled
  • Enhanced checkpoint logging with RunId, source (InMemory/File/Default), and previous/new values on flush
  • Added event classification logging in GrpcEventReader and TcpEventReader

Configuration:

  • Added Replicator.DebugPartitionSequences setting for enabling verbose diagnostic logs
  • Added Sink.IgnoreMetadataEventsForPartitioning setting for metadata handling control
  • Both settings are propagated through SinkPipeOptions and logged at startup when enabled

Files Modified

  • src/Kurrent.Replicator/Replicator.cs
  • src/Kurrent.Replicator/FileCheckpointStore.cs
  • src/Kurrent.Replicator/Partitioning/PartitionChannel.cs
  • src/Kurrent.Replicator/Partitioning/HashPartitioner.cs
  • src/Kurrent.Replicator/Partitioning/ValuePartitioner.cs
  • src/Kurrent.Replicator/Sink/SinkPipe.cs
  • src/Kurrent.Replicator/Sink/SinkPipelineOptions.cs
  • src/Kurrent.Replicator.Shared/Contracts/OriginalEvent.cs
  • src/Kurrent.Replicator.Shared/Contracts/ProposedEvent.cs
  • src/Kurrent.Replicator.Shared/Observe/ReplicationRun.cs (new)
  • src/Kurrent.Replicator.Shared/Observe/ReplicationDebugOptions.cs (new)
  • src/Kurrent.Replicator.KurrentDb/GrpcEventReader.cs
  • src/Kurrent.Replicator.EventStore/TcpEventReader.cs
  • src/Kurrent.Replicator.Kafka/KafkaWriter.cs
  • src/Kurrent.Replicator.Mongo/MongoCheckpointStore.cs
  • src/replicator/Settings/ReplicatorSettings.cs
  • src/replicator/Startup.cs

Testing

  • All existing tests pass
  • Kafka writer tests (10 tests) verified working correctly
  • Manual testing confirmed checkpoint advances correctly and sequence errors are properly detected

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant