-
Notifications
You must be signed in to change notification settings - Fork 0
[WIP] feature : Replace Notify with mpsc channel for producer-consumer communication, memory in batch. #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…unication Major Changes: - Replace tokio::sync::Notify with bounded mpsc::channel (capacity: 100) - Add backpressure control: producer blocks when consumer is slow - Simplify consumer: use channel.recv() instead of Mutex lock/pop loop - Move LSN updates into pre_commit_hook for atomic transaction commitment - Add ReplicationState module with ordered transaction queue (BinaryHeap) - Clean up transaction_manager: remove file-based workflow methods Architecture Improvements: - Producer sends committed transactions directly to channel via try_send_ordered() - Consumer receives from channel without manual polling or notifications - BinaryHeap ensures transactions are sent in commit_lsn order - Channel provides natural backpressure (prevents unbounded memory growth) Benefits: - Better performance: less lock contention, efficient wakeup mechanism - Safer: atomic LSN update with transaction commit (no partial commits) - Cleaner code: removed 600+ lines of dead/redundant code - More reliable: bounded channel prevents memory exhaustion on slow consumers Testing: - All unit tests pass (47 tests) - All integration tests pass - Chaos testing passes (5/5 scenarios including 2M row transaction) - No breaking changes to public API
Summary of ChangesHello @isdaniel, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request fundamentally refactors the core producer-consumer architecture, moving from a file-based transaction persistence model to an in-memory, channel-driven approach. The change aims to improve performance, reduce latency, and enhance the reliability and protocol compliance of transaction processing by centralizing state management and ensuring strict LSN ordering for transaction application. It streamlines the data flow, making the system more robust and efficient. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request represents a significant and positive architectural refactoring, replacing the file-based producer-consumer communication with a more performant in-memory mpsc channel. This change simplifies the codebase, reduces I/O overhead, and improves overall performance. The introduction of a pre_commit_hook for atomic LSN updates is an excellent enhancement for data consistency and reliability. However, I've identified a critical issue in the transaction ordering logic that undermines one of the key benefits of this refactor. There are also a few areas where code clarity and documentation could be improved. My review focuses on ensuring the new architecture correctly implements its stated goals.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a significant and positive architectural refactoring, moving from a file-based system to a more performant in-memory pipeline using an mpsc channel. The introduction of atomic LSN updates via a pre_commit_hook is a major improvement for data consistency. However, my review has identified two critical issues that could compromise the correctness guarantees of the new design: one related to transaction ordering and another to LSN persistence. There are also a couple of medium-severity issues regarding code duplication and documentation that should be addressed. Once these issues are resolved, this will be an excellent enhancement to the project.
|
|
||
| let duration = start_time.elapsed(); | ||
| info!( | ||
| "Successfully executed all {} remaining commands ({} total) in {} batches in {:?} (tx_id: {}, avg: {:?}/batch)", | ||
| command_count, | ||
| total_commands, | ||
| batch_count, | ||
| duration, | ||
| tx_id, | ||
| duration / batch_count.max(1) as u32 | ||
| "Applied transaction {} with {} commands in {:?}", | ||
| xid, | ||
| sql_commands.len(), | ||
| duration | ||
| ); | ||
|
|
||
| // Finalize: update LSN, record metrics, delete file | ||
| Self::finalize_transaction_file( | ||
| pending_tx, | ||
| file_manager, | ||
| lsn_tracker, | ||
| metrics_collector, | ||
| total_commands, | ||
| shared_lsn_feedback, | ||
| ) | ||
| .await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Core logic for finalizing transaction file processing | ||
| async fn finalize_transaction_file( | ||
| pending_tx: &PendingTransactionFile, | ||
| file_manager: &TransactionManager, | ||
| lsn_tracker: &Arc<LsnTracker>, | ||
| metrics_collector: &Arc<MetricsCollector>, | ||
| total_commands: usize, | ||
| shared_lsn_feedback: &Arc<SharedLsnFeedback>, | ||
| ) -> Result<()> { | ||
| let tx_id = pending_tx.metadata.transaction_id; | ||
|
|
||
| // Update LSN tracking (both lsn_tracker and shared_lsn_feedback) | ||
| if let Some(commit_lsn) = pending_tx.metadata.commit_lsn { | ||
| debug!("Transaction {} commit LSN: {}", tx_id, commit_lsn); | ||
|
|
||
| // Update flush LSN (last committed to destination) | ||
| lsn_tracker.commit_lsn(commit_lsn.0); | ||
|
|
||
| // Update shared LSN feedback for PostgreSQL replication protocol | ||
| shared_lsn_feedback.update_flushed_lsn(commit_lsn.0); | ||
| } | ||
|
|
||
| // Record metrics - create a transaction object for metrics recording | ||
| let destination_type_str = pending_tx.metadata.destination_type.to_string(); | ||
|
|
||
| // Create a transaction object for metrics (events are already executed, so we use empty vec) | ||
| // The event_count is derived from the number of SQL commands executed | ||
| let mut transaction = crate::types::Transaction::new( | ||
| pending_tx.metadata.transaction_id, | ||
| pending_tx.metadata.commit_timestamp, | ||
| ); | ||
| transaction.commit_lsn = pending_tx.metadata.commit_lsn; | ||
|
|
||
| // Record transaction processed metrics | ||
| metrics_collector.record_transaction_processed(&transaction, &destination_type_str); | ||
|
|
||
| // Since file-based processing always processes complete transactions, | ||
| // we also record this as a full transaction | ||
| metrics_collector.record_full_transaction_processed(&transaction, &destination_type_str); | ||
|
|
||
| debug!( | ||
| "Successfully processed transaction file with {} commands and recorded metrics", | ||
| total_commands | ||
| "Transaction {} committed with LSN {} updated atomically", | ||
| xid, commit_lsn | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a critical data durability risk here. The pre_commit_hook correctly updates the confirmed_flush_lsn in memory, and it gets committed to the destination database atomically. However, the LsnTracker's state is not persisted to its file after the transaction successfully completes.
If the application crashes after the database transaction commits but before a graceful shutdown, the in-memory LSN update will be lost. On restart, the application will read the old LSN from the file and re-process transactions that have already been applied, leading to data duplication.
You should call lsn_tracker.persist_async().await after the execute_sql_batch_with_hook call succeeds to ensure the new LSN is durable.
| let duration = start_time.elapsed(); | |
| info!( | |
| "Successfully executed all {} remaining commands ({} total) in {} batches in {:?} (tx_id: {}, avg: {:?}/batch)", | |
| command_count, | |
| total_commands, | |
| batch_count, | |
| duration, | |
| tx_id, | |
| duration / batch_count.max(1) as u32 | |
| "Applied transaction {} with {} commands in {:?}", | |
| xid, | |
| sql_commands.len(), | |
| duration | |
| ); | |
| // Finalize: update LSN, record metrics, delete file | |
| Self::finalize_transaction_file( | |
| pending_tx, | |
| file_manager, | |
| lsn_tracker, | |
| metrics_collector, | |
| total_commands, | |
| shared_lsn_feedback, | |
| ) | |
| .await?; | |
| Ok(()) | |
| } | |
| /// Core logic for finalizing transaction file processing | |
| async fn finalize_transaction_file( | |
| pending_tx: &PendingTransactionFile, | |
| file_manager: &TransactionManager, | |
| lsn_tracker: &Arc<LsnTracker>, | |
| metrics_collector: &Arc<MetricsCollector>, | |
| total_commands: usize, | |
| shared_lsn_feedback: &Arc<SharedLsnFeedback>, | |
| ) -> Result<()> { | |
| let tx_id = pending_tx.metadata.transaction_id; | |
| // Update LSN tracking (both lsn_tracker and shared_lsn_feedback) | |
| if let Some(commit_lsn) = pending_tx.metadata.commit_lsn { | |
| debug!("Transaction {} commit LSN: {}", tx_id, commit_lsn); | |
| // Update flush LSN (last committed to destination) | |
| lsn_tracker.commit_lsn(commit_lsn.0); | |
| // Update shared LSN feedback for PostgreSQL replication protocol | |
| shared_lsn_feedback.update_flushed_lsn(commit_lsn.0); | |
| } | |
| // Record metrics - create a transaction object for metrics recording | |
| let destination_type_str = pending_tx.metadata.destination_type.to_string(); | |
| // Create a transaction object for metrics (events are already executed, so we use empty vec) | |
| // The event_count is derived from the number of SQL commands executed | |
| let mut transaction = crate::types::Transaction::new( | |
| pending_tx.metadata.transaction_id, | |
| pending_tx.metadata.commit_timestamp, | |
| ); | |
| transaction.commit_lsn = pending_tx.metadata.commit_lsn; | |
| // Record transaction processed metrics | |
| metrics_collector.record_transaction_processed(&transaction, &destination_type_str); | |
| // Since file-based processing always processes complete transactions, | |
| // we also record this as a full transaction | |
| metrics_collector.record_full_transaction_processed(&transaction, &destination_type_str); | |
| debug!( | |
| "Successfully processed transaction file with {} commands and recorded metrics", | |
| total_commands | |
| "Transaction {} committed with LSN {} updated atomically", | |
| xid, commit_lsn | |
| ); | |
| let duration = start_time.elapsed(); | |
| info!( | |
| "Applied transaction {} with {} commands in {:?}", | |
| xid, | |
| sql_commands.len(), | |
| duration | |
| ); | |
| // Persist the LSN to disk to ensure durability in case of a crash. | |
| if let Err(e) = lsn_tracker.persist_async().await { | |
| error!("Failed to persist LSN tracker state: {}", e); | |
| // This is a critical error, as it could lead to reprocessing on restart. | |
| // Depending on desired guarantees, you might want to terminate. | |
| return Err(e.into()); | |
| } | |
| debug!( | |
| "Transaction {} committed with LSN {} updated atomically", | |
| xid, commit_lsn | |
| ); |
| fn test_commit_lsn_ordering() { | ||
| let (commit_tx, mut commit_rx) = mpsc::channel(10); | ||
| let mut state = ReplicationState::new(commit_tx); | ||
|
|
||
| // Add multiple transactions out of order | ||
| // In the new channel design, transactions are sent immediately as they commit | ||
| // They will be received in the order they're sent (commit order), not LSN order | ||
| // The ordering is now handled by the BinaryHeap BEFORE sending to channel | ||
|
|
||
| // First transaction with LSN 3000 | ||
| state.handle_begin(1, Utc::now()).unwrap(); | ||
| state.handle_commit(Lsn::new(3000), Utc::now()).unwrap(); | ||
| // Sent immediately: LSN 3000 | ||
|
|
||
| // Second transaction with LSN 1000 (smaller) | ||
| state.handle_begin(2, Utc::now()).unwrap(); | ||
| state.handle_commit(Lsn::new(1000), Utc::now()).unwrap(); | ||
| // Sent immediately: LSN 1000 | ||
|
|
||
| // Third transaction with LSN 2000 | ||
| state.handle_begin(3, Utc::now()).unwrap(); | ||
| state.handle_commit(Lsn::new(2000), Utc::now()).unwrap(); | ||
|
|
||
| // Now call try_send_ordered() to send transactions in commit_lsn order | ||
| state.try_send_ordered(); | ||
| // Sent immediately: LSN 2000 | ||
|
|
||
| // Receive in the order they were sent (commit order) | ||
| assert_eq!(commit_rx.try_recv().unwrap().commit_lsn, Lsn::new(3000)); | ||
| assert_eq!(commit_rx.try_recv().unwrap().commit_lsn, Lsn::new(1000)); | ||
| assert_eq!(commit_rx.try_recv().unwrap().commit_lsn, Lsn::new(2000)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case is flawed and, more importantly, it reveals a critical bug in the transaction ordering logic.
-
Incorrect Assertions: The test asserts that transactions are received in the order
3000, 1000, 2000. This is neither commit order nor LSN order. The correct LSN order should be1000, 2000, 3000. -
Incorrect Test Structure: The test doesn't model the real application flow. In the application,
try_send_ordered()is called insidehandle_commit()after each commit. This test calls it only once at the end, which hides the underlying issue. -
Critical Ordering Bug: The core issue is that
try_send_ordered()is called on every commit and attempts to drain theBinaryHeapimmediately. This causes transactions to be sent to the consumer in commit order, notcommit_lsnorder, defeating the purpose of the heap. This violates the architectural goal of applying transactions in strictcommit_lsnorder.
A possible solution is to have the consumer perform the reordering. The producer could send transactions as they commit, and the consumer would buffer them in its own BinaryHeap before processing.
| pg2any uses a three-directory structure for reliable transaction processing: | ||
| - **Producer Task**: Reads PostgreSQL logical replication stream, buffers transactions in memory | ||
| - **Consumer Task**: Applies transactions to destination database in `commit_lsn` order | ||
| - **Notification**: `tokio::sync::Notify` for immediate consumer wakeup on new commits (microsecond latency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation here mentions tokio::sync::Notify, but this pull request replaces Notify with an mpsc channel for producer-consumer communication. This line should be updated to reflect the new architecture.
| - **Notification**: `tokio::sync::Notify` for immediate consumer wakeup on new commits (microsecond latency) | |
| - **Notification**: `tokio::sync::mpsc::channel` for backpressure-aware producer-consumer communication |
| //! SQL Generator for Change Data Capture | ||
| //! | ||
| //! This module implements SQL generation for change events from PostgreSQL logical replication. | ||
| //! It handles schema mappings and generates SQL statements for INSERT, UPDATE, DELETE, and TRUNCATE operations. | ||
| //! | ||
| //! ## Protocol-Compliant Architecture | ||
| //! | ||
| //! In the new architecture, transactions are buffered in-memory (ReplicationState) and applied | ||
| //! directly from the commit queue. This SqlGenerator is used solely for SQL generation, | ||
| //! not for file-based persistence. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new SqlGenerator module is a great refactoring to separate SQL generation logic. However, it appears there's significant code duplication. The pg2any-lib/src/transaction_manager.rs file has been modified to contain almost identical logic, but with the struct named TransactionManager.
Since client.rs and lib.rs now correctly reference SqlGenerator from this new module, the transaction_manager.rs file seems redundant and should probably be removed to avoid confusion and maintainability issues.
Major Changes:
Architecture Improvements:
Benefits: