-
Notifications
You must be signed in to change notification settings - Fork 298
feat(elixir-client): Add Move-In/Out Support for Subqueries in Elixir Client #3699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3699 +/- ##
==========================================
- Coverage 87.36% 84.15% -3.22%
==========================================
Files 23 44 +21
Lines 2011 2834 +823
Branches 535 533 -2
==========================================
+ Hits 1757 2385 +628
- Misses 252 447 +195
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
861bce7 to
fe74421
Compare
📝 WalkthroughWalkthroughAdds move-out support for subqueries in the Elixir client, introducing tagged rows, move-out event handling, synthetic delete generation when dependencies become inactive, and tag-based row tracking. Includes new message types, stream state management, integration tests, and testing infrastructure. Changes
Sequence DiagramssequenceDiagram
participant Server as Electric Server
participant Parser as Message.parse
participant Headers as Headers
participant MoveOut as MoveOutMessage
participant Stream as Electric.Client.Stream
participant Index as Tag Index
Server->>Parser: Move-out event (patterns)
Parser->>MoveOut: from_message/2
MoveOut->>MoveOut: normalize_patterns
MoveOut-->>Stream: MoveOutMessage
Stream->>Index: find rows matching patterns
Index-->>Stream: rows to delete
Stream->>Stream: generate_synthetic_deletes
Stream-->>Stream: buffer ChangeMessages (delete ops)
sequenceDiagram
participant Server as Electric Server
participant Parser as Message.parse
participant CM as ChangeMessage
participant Headers as Headers
participant Stream as Electric.Client.Stream
participant TagIndex as Tag Index Utilities
Server->>Parser: Change event with tags/removed_tags
Parser->>CM: from_message (with request_timestamp)
Parser->>Headers: from_message (extracts tags)
CM-->>Stream: ChangeMessage {operation, tags, removed_tags}
Stream->>TagIndex: update_tag_index
TagIndex->>TagIndex: add_key_to_tags / remove_key_from_tags
TagIndex-->>Stream: updated tag_to_keys, key_data
Stream-->>Stream: store for resume state
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex`:
- Around line 64-75: The decode_quoted_elem function doesn't handle escaped
backslashes, so pairs of backslashes in quoted PostgreSQL array elements are
decoded as two backslashes; add a specific clause in decode_quoted_elem to match
an escaped backslash (e.g., pattern matching "\\" at the head) that pushes a
single backslash into the accumulator and recurses, and insert this clause
before the generic utf8 clause (the existing defp decode_quoted_elem(<<c::utf8,
rest::bitstring>>, acc, state)). Update tests to include a quoted element like
{"a\\b"} that asserts the result is ["a\b"].
In `@packages/elixir-client/lib/electric/client/message.ex`:
- Around line 208-215: The struct field request_timestamp is declared as
DateTime.t() but from_message/2 never sets it, leaving nil and violating the
typespec; update the from_message/2 function(s) to populate request_timestamp
with a DateTime (e.g., DateTime.utc_now()) when building the %__MODULE__{} or,
alternatively, change the t type spec to DateTime.t() | nil if it may
legitimately be unset; modify the clause(s) of from_message/2 that construct the
struct and the `@type` t accordingly so the implementation and spec remain
consistent.
- Around line 217-234: The two from_message/2 clauses produce inconsistent
pattern shapes: the string-key clause maps patterns into atom-keyed maps
(parsed_patterns) while the atom-key clause forwards patterns unchanged; update
the atom-key clause (the def from_message(%{headers: %{event: "move-out",
patterns: patterns}}, handle) clause) to normalize patterns the same way as the
string-key clause (e.g., map each pattern to %{pos: pos, value: value}) or
delegate both clauses to a shared helper (normalize_patterns/1) before building
the %__MODULE__{patterns: ..., handle: handle} to ensure consistent pattern
structure regardless of input key style.
- Around line 185-193: The key_data type currently uses msg: any(), which
weakens type safety; change it to msg: ChangeMessage.t() (or the correct
fully-qualified ChangeMessage module if namespaced) in the `@type` t for the
struct and update any necessary alias or import (e.g., alias
Electric.ChangeMessage or add the module prefix) so the spec compiles; keep the
defstruct unchanged but make sure the ChangeMessage type exists or is
defined/aliased in the same module to satisfy Dialyzer and compile-time checks.
In `@packages/sync-service/test/integration/subquery_move_out_test.exs`:
- Around line 351-352: Replace the brittle Process.sleep(100) with a
deterministic wait: poll the observable condition the server should reach (e.g.,
query the DB or call the public API/function that reflects the change) in a
short loop with a timeout and backoff, or use assert_receive if the code under
test can send a message; implement a helper like wait_until/2 that repeatedly
checks the condition and raises on timeout, and call that helper instead of
Process.sleep(100) so the test waits only as long as needed and is resilient on
CI.
- Around line 186-218: The assertion in the test method that checks tag updates
uses a compound condition that may be hard to debug if it fails. Modify the
assertion in the test "update that changes parent reference updates tags" inside
the block consuming the stream to include a descriptive failure message
indicating the expected tag change or removal, specifying the initial tags, new
tags, and removed tags values. This will clarify test failures related to tag
updates.
In `@packages/sync-service/test/test_helper.exs`:
- Around line 10-14: The direct call to Electric.Client.Application.start/2 is
pattern-matched to {:ok, _} and will crash if the app is already started; change
the invocation to handle both {:ok, _} and {:error, {:already_started, _}}
(e.g., use a case or with to accept either result and ignore the already-started
error) so tests don’t fail when the application is already running; update the
call site Electric.Client.Application.start(:normal, []) accordingly and
optionally log or comment that the already-started case is acceptable.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (16)
.changeset/beige-ladybugs-give.mdpackages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.expackages/elixir-client/lib/electric/client/message.expackages/elixir-client/lib/electric/client/stream.expackages/elixir-client/test/electric/client/message_test.exspackages/elixir-client/test/electric/client_test.exspackages/sync-service/mix.exspackages/sync-service/test/electric/postgres/configuration_test.exspackages/sync-service/test/electric/postgres/lock_breaker_connection_test.exspackages/sync-service/test/electric/shapes/partitioned_tables_test.exspackages/sync-service/test/integration/streaming_test.exspackages/sync-service/test/integration/subquery_move_out_test.exspackages/sync-service/test/support/db_setup.expackages/sync-service/test/support/integration_setup.expackages/sync-service/test/support/stream_consumer.expackages/sync-service/test/test_helper.exs
🧰 Additional context used
🧬 Code graph analysis (7)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)
packages/elixir-client/lib/electric/client/fetch/response.ex (1)
decode!(28-34)packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
cast_to(186-193)cast_to(195-226)cast_to(228-233)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
build_router_opts(487-506)
packages/sync-service/test/integration/subquery_move_out_test.exs (1)
packages/sync-service/test/support/stream_consumer.ex (5)
assert_insert(56-70)assert_up_to_date(124-135)assert_delete(105-119)await_count(153-158)assert_resume(140-143)
packages/elixir-client/test/electric/client/message_test.exs (1)
packages/elixir-client/lib/electric/client/message.ex (6)
parse(239-241)parse(243-245)parse(247-249)parse(251-253)parse(255-257)parse(259-261)
packages/sync-service/test/test_helper.exs (2)
packages/sync-service/lib/electric/application.ex (1)
start(6-20)packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex (1)
start(28-31)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
assert_insert(56-70)assert_up_to_date(124-135)packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
query!(90-105)
packages/elixir-client/lib/electric/client/message.ex (1)
packages/elixir-client/lib/electric/client/stream.ex (1)
shape_handle(346-348)
🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md
6-6: First line in a file should be a top-level heading
(MD041, first-line-heading, first-line-h1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Test packages/experimental w/ sync-service
- GitHub Check: Test packages/react-hooks w/ sync-service
- GitHub Check: Test packages/y-electric w/ sync-service
- GitHub Check: Test packages/start w/ sync-service
- GitHub Check: Run Lux integration tests
- GitHub Check: Build and test sync-service, pg15
- GitHub Check: Build and test sync-service, pg18
- GitHub Check: Build and test sync-service, pg14
- GitHub Check: Build and test sync-service, pg17
🔇 Additional comments (41)
packages/sync-service/test/support/db_setup.ex (1)
4-9: LGTM — pool_size reduction is reasonable for test defaults.Reducing the default
pool_sizefrom 2 to 1 is sensible for minimizing resource usage and avoiding connection contention in tests. Individual tests that require concurrent database access can override this viaconnection_opt_overridesin the test context.packages/sync-service/test/electric/shapes/partitioned_tables_test.exs (1)
2-2: Clarify the reason for disabling async test execution.Switching from
async: truetoasync: falsewill serialize these tests, which impacts overall test suite performance. Given that the setup already useswith_unique_dbfor database isolation, the contention point isn't immediately obvious.Consider adding a brief comment explaining why async execution was disabled (e.g., registry conflicts, shared state with new integration tests, or observed flakiness). This helps future maintainers understand the constraint and avoid inadvertently re-enabling async.
packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs (1)
16-17: LGTM!The
pool_size: 2override is appropriate here. The test requires concurrent database connections: one for the Task holding the advisory lock (line 28) and another for theLockBreakerConnectionto terminate it. This ensures the test has sufficient connections to avoid pool contention.packages/sync-service/test/electric/postgres/configuration_test.exs (1)
107-108: LGTM!The
pool_size: 2override correctly addresses the concurrent connection requirement for this test. One connection is held by the Task acquiring the table lock (line 128), while another is needed for theadd_table_to_publicationcall (line 144) that's expected to timeout. This follows the established pattern already used in this file (line 315).packages/sync-service/mix.exs (1)
122-123: LGTM!The test-only dependency on
electric_clientis correctly configured withruntime: falseto prevent automatic OTP application startup (which is handled explicitly in test_helper.exs). The relative path reference is appropriate for monorepo structure.packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)
1-2: LGTM!Wrapping the module in
Code.ensure_loaded?(Ecto)is the correct pattern for making this functionality optional when Ecto is not a dependency.
79-92: LGTM!The distinction between
castandcast_quotedcorrectly handles NULL semantics — in PostgreSQL array format, NULL is always unquoted, so quoted elements can skip the NULL check.packages/sync-service/test/support/stream_consumer.ex (4)
23-43: LGTM!Clean implementation of the stream consumer with proper task lifecycle management. The message forwarding pattern is appropriate for test assertions.
189-208: Strict assertion semantics may cause flaky tests.
do_assert_receive_message(used byassert_insert,assert_update,assert_delete) flunks immediately on any non-matching message. This means if aControlMessage(like:up_to_date) arrives before an expectedChangeMessage, the test fails.This differs from
do_assert_receive_matching(used byassert_resume) which skips non-matching messages.If this strict ordering is intentional for enforcement, consider adding a note in the docstrings. Otherwise, consider using the lenient approach for change assertions too.
293-303: LGTM!The
with_consumermacro provides clean ergonomics with proper resource cleanup viatry/after. Usingvar!(consumer)to inject the binding is appropriate for this DSL pattern.
256-277: LGTM!The
collect_messagesutility with a short default timeout (100ms) is useful for "collect what's available" scenarios without blocking tests for too long..changeset/beige-ladybugs-give.md (1)
1-6: LGTM!The changeset correctly specifies patch bumps for both affected packages. The MD041 linting warning is a false positive — changeset files require YAML frontmatter as the first content, not a markdown heading.
packages/elixir-client/test/electric/client/message_test.exs (3)
33-76: Good test coverage for MoveOutMessage parsing.The tests cover essential parsing scenarios including string/atom key variants and multiple patterns. However, consider adding a negative test case for malformed move-out messages (e.g., missing
patternsfield) to ensure graceful error handling.
78-124: Solid test coverage for tags and removed_tags in headers.Tests cover the key scenarios: tags only, removed_tags only, both present, and defaults to empty lists. This ensures backward compatibility with untagged shapes.
126-167: Good ChangeMessage parsing tests with comprehensive assertions.The tests verify key, value, headers, old_value, and handle propagation. Coverage for insert/update/delete operations is complete.
packages/elixir-client/test/electric/client_test.exs (7)
38-47: Well-designed helper for testing internal stream state.The recursive
drain_stream/1function properly drains the stream and returns the final state for inspection. This is essential for verifying tag index state after processing.
1378-1429: Good backward compatibility test for untagged shapes.This test correctly verifies that the tag index remains empty when messages don't include tags, ensuring backward compatibility with existing shapes that don't use subquery-based membership.
1431-1492: Core move-out test validates the happy path correctly.This test verifies the fundamental move-out flow: insert with tag → move-out event → synthetic delete with the original row data. The assertion that the delete contains the full value (
"name" => "test") is important for downstream consumers.
1777-1860: Test documents edge case for duplicate tag entries.This test validates that updates with the same tag but without
removed_tagsdon't cause duplicate synthetic deletes. The assertion on Line 1858 expects exactly 1 delete, which aligns with the implementation instream.exthat useskey_datato track per-key state and avoid duplicates.
1862-1926: Critical edge case: partial move-out with multiple tags.This test verifies that a row with multiple tags (
["tag-a", "tag-b"]) is NOT deleted when only one tag is moved out. The implementation ingenerate_synthetic_deletes/2correctly checks ifremaining_tagsis empty before adding tokeys_to_delete.
2169-2253: Resume test validates state persistence across stream restarts.This test is crucial for ensuring move-out functionality works correctly when a client resumes from a checkpoint. The
ResumeMessagenow includestag_to_keysandkey_datafields (Lines 260-261 in stream.ex), which are restored on resume (Lines 387-395 in stream.ex).
2075-2167: Test documents important edge case for tag removal.This test validates that when a row is updated to remove ALL its tags (via
removed_tagswithout newtags), subsequent move-out events are no-ops. The implementation handles this inupdate_tag_index(Lines 451-461 in stream.ex) by clearing the key from both data structures whenupdated_tagsis empty.packages/sync-service/test/integration/streaming_test.exs (2)
1-70: Solid integration test structure with good coverage of streaming basics.The test module properly:
- Uses async: false for integration tests that share resources
- Tags tests with
@tmp_dirfor isolated temp directories- Uses the expressive
with_consumerDSL for assertions- Tests both snapshot and live streaming modes
The test setup chain is well-organized and reusable.
43-61: Potential race condition in live changes test.The test inserts a row via
Postgrex.query!and immediately asserts on the stream. While this likely works due to Electric's replication mechanism, consider adding a brief assertion or using a more robust synchronization mechanism if this test becomes flaky.packages/elixir-client/lib/electric/client/stream.ex (7)
22-27: Clear documentation for new move-out data structures.The inline comments explain the purpose of
tag_to_keysandkey_datawell. These structures enable efficient tag lookup and deduplication.
233-249: Move-out handling with clear assumptions documented.The comment on Lines 234-235 documents the assumption that move-out events only occur after the initial snapshot. This is important context for understanding when synthetic deletes are generated.
The implementation correctly:
- Generates synthetic deletes for matching patterns
- Updates both
tag_to_keysandkey_dataatomically- Buffers synthetic deletes in order
423-480: Comprehensive tag index update logic with proper edge case handling.The implementation correctly handles:
- Initial insert (no current_data)
- Updates with added/removed tags
- Delete operations (full cleanup)
- Updates that result in zero tags (cleanup)
The use of
MapSetoperations ensures no duplicate tags per key.
511-574: Two-pass algorithm correctly handles multi-tag rows.The algorithm properly:
- First pass: collects all matching keys and tracks which tags are being removed
- Second pass: checks remaining tags before deciding to delete
This ensures a row with tags
["tag-a", "tag-b"]won't be deleted when onlytag-ais moved out.
385-402: Resume logic uses Map.get with defaults for backward compatibility.Using
Map.get(resume, :tag_to_keys, %{})ensures backward compatibility withResumeMessagestructs that don't have these new fields (e.g., from persisted state before this change).
267-270: Error unwrapping handles various error payload formats.The
unwrap_error/1function handles empty lists, single-element lists, multi-element lists, and non-list values. This provides flexibility for different error formats from the server.
514-515: Assumption about move-out pattern matching is accurate.The current implementation only matches patterns using the
valuefield and completely ignores theposfield, confirming the comment's statement. Across all test cases and the pattern-matching logic ingenerate_synthetic_deletes, no positional matching is performed. Theposfield is transmitted in the protocol and parsed but never used, suggesting it was included for future extensibility if composite tag support becomes necessary.packages/sync-service/test/support/integration_setup.ex (1)
17-41: Well-structured integration test helper.The helper properly:
- Waits for the Electric stack to be active before starting the server
- Uses port 0 for dynamic allocation, avoiding port conflicts
- Uses
start_supervisedfor automatic cleanup- Returns a comprehensive context map for test use
The 2000ms timeout is appropriately balanced for this helper's complexity and aligns with timeout patterns used across the test suite.
packages/sync-service/test/integration/subquery_move_out_test.exs (7)
1-26: LGTM - Well-structured test module setup.The module definition, imports, and shape definition are clear and well-organized. The
@subquery_wheremodule attribute provides good reusability across tests.
42-49: LGTM - Tag presence verification test.Correctly verifies that change messages for subquery-matched rows include tags in headers.
55-73: LGTM - Core move-out test.This test properly validates the end-to-end flow: initial snapshot, parent deactivation, and synthetic delete generation.
80-107: LGTM - Multi-child synthetic delete test.Good use of
await_countto collect multiple deletes without relying on order, then sorting for comparison.
133-167: LGTM - Move-in after parent change test.This test validates an important edge case where a child row moves between parents, covering both move-out and move-in flows.
365-385: LGTM - Clean helper function.The helper properly creates the parent/child table structure needed for subquery tests with appropriate constraints.
258-269:collect_messages/2helper exists in StreamConsumer atpackages/sync-service/test/support/stream_consumer.ex:256.packages/elixir-client/lib/electric/client/message.ex (2)
8-48: LGTM - Headers struct extended correctly.The
tagsandremoved_tagsfields are properly added with sensible defaults and correct type definitions. Thefrom_message/2function correctly extracts these fields with fallbacks to empty lists.
251-257: LGTM - Parse clauses for move-out events.The parse clauses correctly match both string-key and atom-key formats and delegate to
MoveOutMessage.from_message/2, consistent with the pattern used forControlMessage.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex
Show resolved
Hide resolved
packages/sync-service/test/integration/subquery_move_out_test.exs
Outdated
Show resolved
Hide resolved
packages/sync-service/test/integration/subquery_move_out_test.exs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/stream.ex`:
- Around line 562-575: Synthetic delete messages are using DateTime.utc_now()
instead of the move-out/request timestamp; change the synthetic_deletes
construction so the Message.ChangeMessage.request_timestamp is set to the
move-out/request timestamp (propagate the move-out event's request_timestamp
through keys_to_delete rather than calling DateTime.utc_now()), and update the
caller in handle_msg to include/pass that timestamp into keys_to_delete so
synthetic_deletes can use it (refer to synthetic_deletes, keys_to_delete,
Message.ChangeMessage.request_timestamp, and handle_msg).
- Around line 237-253: handle_msg/2 currently processes Message.MoveOutMessage
without verifying the snapshot is complete; add a defensive check at the top of
the Message.MoveOutMessage clause to ensure stream.up_to_date? (or the
equivalent boolean on the stream) is true before calling
generate_synthetic_deletes, and if it's false do not apply deletes: instead log
or emit a protocol error and return {:cont, stream} (or otherwise ignore/buffer
the message) to avoid deleting rows that were never observed. Ensure you
reference the existing symbols handle_msg, Message.MoveOutMessage,
stream.up_to_date?, and generate_synthetic_deletes when making the change.
♻️ Duplicate comments (3)
packages/elixir-client/test/electric/client/message_test.exs (1)
73-78: Use pattern matching for clearer assertions.Using
Enum.atwith separate assertions is less idiomatic. Pattern matching directly on the list makes the test more readable and concise.♻️ Suggested refactor
- assert [%MoveOutMessage{patterns: patterns}] = Message.parse(msg, "handle", & &1, `@ts`) - assert length(patterns) == 3 - assert Enum.at(patterns, 0) == %{pos: 0, value: "tag-1"} - assert Enum.at(patterns, 1) == %{pos: 1, value: "tag-2"} - assert Enum.at(patterns, 2) == %{pos: 2, value: "tag-3"} + assert [%MoveOutMessage{patterns: patterns}] = Message.parse(msg, "handle", & &1, `@ts`) + assert [ + %{pos: 0, value: "tag-1"}, + %{pos: 1, value: "tag-2"}, + %{pos: 2, value: "tag-3"} + ] = patternspackages/elixir-client/lib/electric/client/message.ex (2)
190-198: Consider using a more specific type formsginkey_data.The
msg: any()type reduces type safety. Since this always stores aChangeMessage, the type should reflect that.- key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: any()}} + key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}}
222-249: Inconsistent pattern normalization betweenfrom_message/3clauses.The string-key version (lines 222-237) transforms patterns by extracting
"pos"and"value"into atom-keyed maps, but the atom-key version (lines 239-249) passespatternsthrough unchanged. This can cause inconsistent pattern structures depending on input format.🐛 Proposed fix to normalize patterns consistently
def from_message( %{headers: %{event: "move-out", patterns: patterns}}, handle, request_timestamp ) do + # Normalize patterns to ensure consistent structure + parsed_patterns = + Enum.map(patterns, fn + %{pos: pos, value: value} -> %{pos: pos, value: value} + %{"pos" => pos, "value" => value} -> %{pos: pos, value: value} + end) + %__MODULE__{ - patterns: patterns, + patterns: parsed_patterns, handle: handle, request_timestamp: request_timestamp } end
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
packages/elixir-client/lib/electric/client/message.expackages/elixir-client/lib/electric/client/stream.expackages/elixir-client/test/electric/client/message_test.exs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build and test sync-service, pg14
- GitHub Check: Build and test sync-service, pg15
- GitHub Check: Build and test sync-service, pg17
- GitHub Check: Build and test sync-service, pg18
- GitHub Check: Run Lux integration tests
🔇 Additional comments (13)
packages/elixir-client/test/electric/client/message_test.exs (4)
1-9: LGTM!Module setup with aliases and test timestamp constant is well-structured.
10-34: LGTM!ControlMessage tests cover all three control types (
up-to-date,must-refetch,snapshot-end) with both string and atom key variants, ensuring robust parsing.
81-127: LGTM!Good coverage for tag-related header parsing including edge cases: tags only, removed_tags only, both present, and defaults when missing.
129-170: LGTM!ChangeMessage tests appropriately validate parsing of insert, update with old_value, and delete operations, with proper header assertions.
packages/elixir-client/lib/electric/client/stream.ex (5)
22-27: LGTM!Clear documentation of the new state fields for move-out tracking. The data structure design with
tag_to_keysfor reverse lookups andkey_datafor per-key state is well thought out.
260-266: LGTM!ResumeMessage correctly includes tag tracking state, enabling proper resume behavior for shapes with move-out support.
335-338: LGTM!Reset correctly clears all tag tracking state alongside other stream state.
389-406: LGTM!Resume properly restores tag state from the ResumeMessage with safe defaults for backward compatibility with older resume data.
427-484: LGTM!Tag index update logic correctly handles all cases: inserts (add tags), updates (add/remove tags), and deletes (remove key entirely). The use of MapSet for efficient set operations is appropriate.
packages/elixir-client/lib/electric/client/message.ex (4)
7-58: LGTM!Headers struct properly extended with
tagsandremoved_tagsfields, with appropriate type definitions and defaults.
60-112: LGTM!ControlMessage correctly propagates request_timestamp through both string-key and atom-key parsing paths.
114-166: LGTM!ChangeMessage parsing properly includes request_timestamp propagation.
254-291: LGTM!Parse function dispatch correctly routes to appropriate message constructors for all message types: ChangeMessage, ControlMessage, MoveOutMessage, and empty strings.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/elixir-client/lib/electric/client/stream.ex (1)
96-108: Type spec is missing the newtag_to_keysandkey_datafields.The
@type tspecification doesn't include the newly added fields, which reduces type safety and IDE support.🔧 Suggested fix
`@type` t :: %__MODULE__{ client: Client.t(), schema: Client.schema(), value_mapper_fun: Client.ValueMapper.mapper_fun(), parser: nil | {module(), term()}, buffer: :queue.queue(), up_to_date?: boolean(), offset: Client.offset(), replica: Client.replica(), shape_handle: nil | Client.shape_handle(), state: :init | :stream | :done, - opts: opts() + opts: opts(), + tag_to_keys: %{optional(String.t()) => MapSet.t(String.t())}, + key_data: %{optional(String.t()) => %{tags: MapSet.t(String.t()), msg: Message.ChangeMessage.t()}} }
🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/stream.ex`:
- Around line 489-516: The Enum.reduce calls unnecessarily call MapSet.to_list,
causing allocations; update remove_key_from_tags and add_key_to_tags to pass the
MapSet `tags` directly to Enum.reduce (i.e., Enum.reduce(tags, ...)) and remove
the .to_list() call, and apply the same change to the Enum.reduce usages inside
update_tag_index so all reductions iterate the MapSet directly.
♻️ Duplicate comments (1)
packages/elixir-client/lib/electric/client/stream.ex (1)
237-256: Move-out handling lacks defensive validation for snapshot completion.The comment on lines 241-242 states an assumption that move-out events only arrive after the snapshot is complete, but the code doesn't validate this. If the server sends a move-out during snapshot sync (protocol violation or bug), this could delete rows that were never observed by the client.
Consider adding a defensive check or at minimum logging when this invariant is violated.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
packages/elixir-client/lib/electric/client/stream.ex
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Test packages/react-hooks w/ sync-service
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Test packages/experimental w/ sync-service
- GitHub Check: Test packages/start w/ sync-service
- GitHub Check: Test packages/y-electric w/ sync-service
- GitHub Check: Build and test sync-service, pg14
- GitHub Check: Build and test sync-service, pg17
- GitHub Check: Build and test sync-service, pg18
- GitHub Check: Build and test sync-service, pg15
- GitHub Check: Run Lux integration tests
- GitHub Check: Build and test elixir-client
🔇 Additional comments (8)
packages/elixir-client/lib/electric/client/stream.ex (8)
188-192: LGTM!Correctly propagates
request_timestampthrough the message parsing chain, enabling synthetic deletes to use the original timestamp.
232-235: LGTM!The tag index is correctly updated before buffering the message, ensuring consistent state tracking.
262-272: LGTM!Tag state is correctly persisted in the
ResumeMessage, enabling proper state restoration on stream resume.
274-277: LGTM!The
unwrap_errorfunction properly normalizes various error payload shapes with sensible defaults.
330-342: LGTM!Tag state is correctly cleared on reset, ensuring a clean slate when the shape handle changes.
392-410: LGTM!The use of
Map.getwith default empty maps ensures backward compatibility withResumeMessagestructs that don't have tag state fields.
430-487: LGTM!The tag index update logic correctly handles all operation types:
- Deletes properly clean up all tracking state
- Updates efficiently compute tag differences using
MapSet.differenceandMapSet.union- Empty tag sets trigger cleanup to prevent memory leaks
564-578: The implementation of synthetic deletes withold_value: nilappears intentional and is consistent with how the codebase handles delete messages. The delete message test does not assert any expectation forold_value, and no code in the repository accessesold_valuein the context of delete operations. Synthetic deletes are client-side tag-based removals, distinct from server-initiated deletes, and settingold_valuetonilis the current design.Likely an incorrect or invalid review comment.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@packages/elixir-client/test/electric/client_test.exs`:
- Around line 1777-1860: Rename the test whose description string is "update
without removed_tags causes duplicate synthetic deletes" to a name that reflects
the expected (fixed) behavior, e.g. "update without removed_tags should still
generate single synthetic delete" (or "deduplicates synthetic deletes when same
tag appears in multiple updates"), and update any top-line comment text that
claims a BUG to instead state the expected behavior so the test name and
comments align with the assertion and intent.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
packages/elixir-client/test/electric/client_test.exs
🧰 Additional context used
🧬 Code graph analysis (1)
packages/elixir-client/test/electric/client_test.exs (4)
packages/elixir-client/lib/electric/client/stream.ex (1)
next(144-149)packages/elixir-client/test/electric/client/embedded_test.exs (1)
stream(11-13)packages/elixir-client/lib/electric/client.ex (3)
stream(493-495)stream(503-505)stream(543-547)packages/elixir-client/lib/electric/client/mock.ex (1)
up_to_date(157-160)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test packages/y-electric w/ sync-service
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Run Lux integration tests
🔇 Additional comments (12)
packages/elixir-client/test/electric/client_test.exs (12)
38-47: LGTM! Clean recursive helper for draining streams.The helper correctly recurses until
:haltand returns the final stream struct for state inspection.Minor style note: the
aliason line 41 could be moved to the module-level aliases (around line 7-10) for consistency, though keeping it local is also acceptable since this is a private test helper.
898-898: LGTM! Retry count appropriately increased to handle test scenario.The test simulates multiple error conditions (initial 500, Bypass down/up cycle, 8 retry status codes) before success. Increasing to 15 retries provides sufficient headroom to prevent test flakiness.
1378-1429: LGTM! Good backward compatibility test.This test verifies that shapes without tags don't pollute the tag index, which is essential for backward compatibility with existing shapes that don't use subquery-based move-out semantics.
1431-1492: LGTM! Core move-out functionality test.Good coverage of the primary use case: insert with tag → move-out event → synthetic delete generation with correct value data.
1494-1560: LGTM! Multi-row move-out scenario.Good test for verifying that a single move-out pattern correctly generates synthetic deletes for all matching rows.
1562-1622: LGTM! Non-matching pattern test.Essential negative test verifying that move-out patterns only affect rows with matching tags.
1624-1699: LGTM! Tag transition test.Important coverage for the tag replacement scenario where a row moves from one tag to another via
removed_tags. Correctly verifies no synthetic delete occurs since the row remains in the shape via the new tag.
1701-1775: LGTM! Delete cleanup test.Correctly verifies that explicit deletes clear the row from the tag index, preventing duplicate deletes when a subsequent move-out event arrives.
1862-1926: LGTM! Critical partial move-out test.This tests an important semantic: rows with multiple tags should only be deleted when ALL their tags are moved out. The assertion correctly expects no synthetic deletes when the row still belongs via
tag-b.
1928-2009: LGTM! Value freshness test.Critical test ensuring synthetic deletes contain the most recent row value, not stale data from initial insert. This is important for downstream consumers that may need accurate final state.
2011-2073: LGTM! Pattern deduplication test.Good edge case coverage: when a single move-out event contains multiple patterns that all match the same row, only one synthetic delete should be generated.
2075-2167: LGTM! Complete tag removal test.Well-documented edge case: when an update removes all tags from a row (via
removed_tagswith no newtags), the row should be cleared from the tag index. Subsequent move-out events for the removed tag should be a no-op.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@packages/sync-service/test/integration/subquery_move_out_test.exs`:
- Around line 306-312: The current conditional allows the assertion to be
skipped if tags are nil; instead assert that tags are present and equal: fetch
tags1 and tags2 from msg1.headers and msg2.headers (Map.get(msg1.headers, :tags)
and Map.get(msg2.headers, :tags)), add a non-nil check like assert not
is_nil(tags1) (or assert tags1 != nil) and then assert tags1 == tags2 so the
test fails if tags are missing or mismatched.
♻️ Duplicate comments (1)
packages/sync-service/test/integration/subquery_move_out_test.exs (1)
214-217: Improve assertion message for better debuggability.The compound condition is correct but could benefit from a descriptive failure message.
♻️ Suggested improvement
- # Either we have explicit removed_tags, or tags have changed - assert new_tags != initial_tags or length(removed_tags) > 0 + # Either we have explicit removed_tags, or tags have changed + assert new_tags != initial_tags or length(removed_tags) > 0, + "Expected tags to change or removed_tags to be present. " <> + "initial_tags: #{inspect(initial_tags)}, new_tags: #{inspect(new_tags)}, " <> + "removed_tags: #{inspect(removed_tags)}"
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
packages/sync-service/test/integration/subquery_move_out_test.exs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test packages/experimental w/ sync-service
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Run Lux integration tests
🔇 Additional comments (6)
packages/sync-service/test/integration/subquery_move_out_test.exs (6)
27-49: Well-structured test setup and initial test case.The setup chain is correctly ordered, and the first test properly validates that tags are present on change messages for subquery-matched rows. The pattern
%{headers: %{tags: [_]}}correctly asserts a non-empty tags list.
55-73: Good coverage of the core move-out scenario.This test correctly validates the primary use case: deactivating a parent triggers a synthetic delete for the child. The test properly waits for
up_to_datebefore making mutations, ensuring deterministic behavior.
75-107: Correct use ofawait_countfor multiple deletes.Using
await_countwith a match function is the right approach for collecting multiple synthetic deletes without assuming order. Sorting before comparison ensures deterministic assertions.
225-270: Excellent edge case test for tag index consistency.This test validates critical behavior: when a child row switches parents, the tag index must be updated to prevent spurious deletes when the old parent is deactivated. The inline documentation clearly explains the scenario being tested.
331-358: Well-designed resume scenario test.This test correctly simulates a client that syncs initial data, disconnects, has the underlying data change (parent deactivated), and then resumes. Using
live: falseto get aResumeMessageand then resuming with it is the right approach to test resume semantics.
361-382: Clean helper setup for parent/child relationship testing.The schema correctly models the subquery scenario with
ON DELETE CASCADEmaintaining referential integrity. Returning the tables list follows the established pattern for context propagation.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
packages/sync-service/test/integration/subquery_move_out_test.exs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/message.ex`:
- Around line 246-251: The atom-key clause in normalize_patterns returns the
original map allowing extra keys, whereas the string-key clause constructs a new
map with only pos and value; update normalize_patterns to explicitly extract and
return only %{pos: pos, value: value} in both clauses (i.e., pattern-match the
atom-key clause like %{pos: pos, value: value} -> %{pos: pos, value: value}) so
both branches normalize to the same sanitized shape.
- Around line 254-293: The parse/4 function lacks a catch-all clause and will
raise FunctionClauseError on unrecognized message shapes; update the parse/4
clauses (the parse function handling maps and strings) to include a final
fallback clause that matches any input (e.g., parse(_, _shape_handle,
_value_mapper_fun, _request_timestamp)) and returns [] to mirror the
empty-string case, or alternatively add a `@doc` on the parse/4 function stating
the intentional fail-fast behavior if you prefer to keep it without a fallback;
reference the parse/4 function and the
ChangeMessage/ControlMessage/MoveOutMessage dispatch points when making the
change.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
packages/elixir-client/lib/electric/client/message.ex
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Run Lux integration tests
🔇 Additional comments (4)
packages/elixir-client/lib/electric/client/message.ex (4)
7-58: LGTM!The
Headersmodule additions are well-structured. The newtagsandremoved_tagsfields have appropriate default values, type definitions are accurate, andfrom_message/2gracefully handles missing keys withMap.get/3defaults.
60-112: LGTM!The
ControlMessagemodule correctly integratesrequest_timestampinto both the struct and type definition. Bothfrom_message/3clauses consistently handle string-key and atom-key message formats.
114-166: LGTM!The
ChangeMessagemodule correctly propagatesrequest_timestampthrough the struct. Thefrom_message/4signature change is appropriately reflected in the type definition.
168-199: LGTM!The
ResumeMessagemodule correctly adds move-state tracking fields with appropriate default values. Themsgfield inkey_datanow properly usesChangeMessage.t()for type safety, addressing the previous review concern.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
b80ceb4 to
c6fb79f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In @.github/workflows/sync_service_tests.yml:
- Line 45: The workflow uses the unsupported "command" key on a GitHub Actions
service container (the line `command: postgres -c max_connections=200`), so
remove that key and instead set up Postgres with the desired max_connections via
one of the supported approaches: use a Docker image already built with the
Postgres CMD configured for `-c max_connections=200`, start Postgres in a run
step with `docker run` and the appropriate CLI flags, or run docker-compose in a
step to control entrypoint/command; delete the `command` entry from the service
definition and implement one of those alternatives to ensure
`max_connections=200` is applied.
In `@packages/sync-service/test/electric/shapes/partitioned_tables_test.exs`:
- Line 2: Add a brief comment above the "use ExUnit.Case, async: false" line in
partitioned_tables_test.exs explaining that async is disabled due to shared DB
state (publications, partitions) to prevent test interference; reference the
"use ExUnit.Case, async: false" line so maintainers know this is intentional and
not an oversight.
In `@packages/sync-service/test/support/integration_setup.ex`:
- Around line 17-41: The hardcoded 2000ms in with_electric_client is brittle;
make the wait timeout configurable by reading a :wait_timeout from the opts (or
an env var fallback) and pass that value to
Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: timeout). Update
with_electric_client to compute timeout = Keyword.get(opts, :wait_timeout,
String.to_integer(System.get_env("ELECTRIC_WAIT_TIMEOUT") || "2000")) (or
similar) and use that timeout when calling
Electric.StatusMonitor.wait_until_active so CI can override it.
In `@packages/sync-service/test/support/stream_consumer.ex`:
- Around line 56-119: The assertion helpers assert_insert, assert_update, and
assert_delete currently call assert_receive_message (which fails on the first
non-matching message) while assert_resume uses assert_receive_matching (which
skips non-matching messages), causing inconsistent and surprising test behavior;
make the behavior consistent by switching assert_insert, assert_update, and
assert_delete to use assert_receive_matching (or another skipping helper used by
assert_resume) and update their docstrings to note that these assertions will
skip non-matching messages, referencing the functions assert_insert,
assert_update, assert_delete and assert_resume so reviewers can locate and
verify the change.
♻️ Duplicate comments (1)
packages/elixir-client/lib/electric/client/message.ex (1)
201-252: LGTM!The
MoveOutMessagemodule is well-documented and properly structured. The dualfrom_message/3clauses handle both string-key and atom-key input formats consistently, with properrequest_timestamppropagation.Note on
normalize_patterns/1: The atom-key clause at line 249 still returns the pattern as-is rather than explicitly extracting onlyposandvaluekeys. This was previously flagged as a nitpick to strip potential extra keys.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (19)
.changeset/beige-ladybugs-give.md.github/workflows/sync_service_tests.ymlpackages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.expackages/elixir-client/lib/electric/client/message.expackages/elixir-client/lib/electric/client/stream.expackages/elixir-client/test/electric/client/message_test.exspackages/elixir-client/test/electric/client_test.exspackages/sync-service/dev/postgres.confpackages/sync-service/dev/postgres2.confpackages/sync-service/mix.exspackages/sync-service/test/electric/postgres/configuration_test.exspackages/sync-service/test/electric/postgres/lock_breaker_connection_test.exspackages/sync-service/test/electric/shapes/partitioned_tables_test.exspackages/sync-service/test/integration/streaming_test.exspackages/sync-service/test/integration/subquery_move_out_test.exspackages/sync-service/test/support/db_setup.expackages/sync-service/test/support/integration_setup.expackages/sync-service/test/support/stream_consumer.expackages/sync-service/test/test_helper.exs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead
Applied to files:
packages/sync-service/test/integration/streaming_test.exs
🧬 Code graph analysis (6)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
assert_insert(56-70)assert_up_to_date(124-135)packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
query!(90-105)
packages/elixir-client/test/electric/client_test.exs (4)
packages/elixir-client/lib/electric/client/stream.ex (1)
next(144-149)packages/elixir-client/test/electric/client/embedded_test.exs (1)
stream(11-13)packages/elixir-client/lib/electric/client.ex (3)
stream(493-495)stream(503-505)stream(543-547)packages/elixir-client/lib/electric/client/mock.ex (1)
up_to_date(157-160)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
build_router_opts(487-506)
packages/elixir-client/test/electric/client/message_test.exs (1)
packages/elixir-client/lib/electric/client/message.ex (6)
parse(256-258)parse(260-267)parse(269-271)parse(273-280)parse(282-289)parse(291-293)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)
packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
cast_to(186-193)cast_to(195-226)cast_to(228-233)
packages/elixir-client/lib/electric/client/stream.ex (1)
packages/elixir-client/lib/electric/client/message.ex (6)
parse(256-258)parse(260-267)parse(269-271)parse(273-280)parse(282-289)parse(291-293)
🪛 actionlint (1.7.10)
.github/workflows/sync_service_tests.yml
45-45: unexpected key "command" for "services" section. expected one of "credentials", "env", "image", "options", "ports", "volumes"
(syntax-check)
🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md
6-6: First line in a file should be a top-level heading
(MD041, first-line-heading, first-line-h1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Test packages/experimental w/ sync-service
- GitHub Check: Run Lux integration tests
🔇 Additional comments (43)
packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs (1)
16-17: LGTM! Necessary pool size increase for concurrent connection usage.This tag is required because the test spawns a Task (lines 25-40) that holds a database connection indefinitely via
DBConnection.runwithProcess.sleep(:infinity), while the main test process also needs to queryctx.db_connto verify lock state. With pool_size of 1, this would cause a deadlock.packages/sync-service/mix.exs (1)
122-123: LGTM!The addition of
electric_clientas a test-only dependency is appropriate for enabling the integration tests. The configuration is correct:only: [:test]restricts it to the test environment, andruntime: falseensures it won't be started as an application dependency. The path reference to../elixir-clientis valid and the package name matches.packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)
64-75: Pre-existing escaped backslash issue already tracked.The escaped backslash handling gap in
decode_quoted_elem/3(where\\should decode to a single\) was identified in a previous review and is tracked in issue#3708. Since this PR only adds the conditional wrapper and doesn't modify the decoding logic, no action is needed here.
1-2: Conditional Ecto dependency guard is consistent and correct.The
Code.ensure_loaded?(Ecto)pattern wrapping theArrayDecodermodule is idiomatic for optional dependencies and is consistent across all related modules:
- Parent
EctoAdaptermodule uses the same conditional guard- Sibling
Postgresmodule also uses the same patternThis ensures the module is only defined when Ecto is available, which is appropriate since the module depends on
Ecto.CastErrorandEcto.Type.packages/sync-service/dev/postgres.conf (1)
4-4: LGTM!The
max_connections = 200setting aligns with the CI workflow andpostgres2.confchanges, providing consistent connection limits across dev and CI environments for the expanded test suite.packages/sync-service/test/support/db_setup.ex (1)
7-7: LGTM!Reducing
pool_sizeto 1 for the utility pool is sensible since database creation/teardown operations are sequential by nature and don't benefit from parallelism. This also helps manage overall connection usage with the expanded test suite.packages/sync-service/dev/postgres2.conf (1)
4-4: LGTM!Consistent with
postgres.confand CI workflow changes, maintaining uniform connection limits across all dev postgres instances..changeset/beige-ladybugs-give.md (1)
1-6: LGTM!The changeset correctly declares patch versions for both affected packages. The description accurately summarizes the feature. The markdownlint warning about MD041 is a false positive—changeset files intentionally use YAML frontmatter format.
packages/sync-service/test/electric/postgres/configuration_test.exs (1)
107-107: LGTM! Pool size override ensures test reliability.The
pool_size: 2tag is necessary here since the test requires one connection to hold theACCESS EXCLUSIVElock while another connection attempts the publication operation. This prevents potential deadlocks in the test.packages/sync-service/test/test_helper.exs (1)
10-14: LGTM! Well-documented workaround for circular dependency.The comment clearly explains the circular dependency issue and the rationale for the direct application start. The pattern match
{:ok, _}is appropriate for standard test runs.packages/sync-service/test/support/stream_consumer.ex (2)
210-223: Clean recursive pattern matching implementation.The
pattern_matches?/2helper correctly handles nested maps and missing keys.
293-303: Well-designed macro with proper cleanup handling.The
with_consumermacro correctly usestry/afterto ensure the consumer is stopped even when the test block raises. Good use ofvar!(consumer)for variable hygiene.packages/sync-service/test/integration/streaming_test.exs (1)
1-70: LGTM! Well-structured integration test suite for basic streaming.The test module provides good coverage of fundamental streaming scenarios:
- Initial snapshot with pre-existing data
- Live change propagation
- Empty table handling
The setup chain (
with_unique_db,with_basic_tables,with_sql_execute,with_complete_stack,with_electric_client) establishes a clean test environment. Usingasync: falseis correct for integration tests that share database state.packages/sync-service/test/integration/subquery_move_out_test.exs (7)
1-49: LGTM! Comprehensive integration test setup for subquery move-out.The test module is well-organized with:
- Clear moduledoc explaining test scope
- Proper parent/child table schema for subquery testing
- Shape definition with
@subquery_wherefiltering by active parentsThe first test correctly validates that change messages include tags for subquery-matched rows, which is foundational for move-out support.
55-73: LGTM! Move-out control message test validates core functionality.This test correctly verifies the expected behavior when a parent is deactivated: the child row receives a synthetic delete. The test sequence (insert → up_to_date → deactivate parent → delete) matches the expected flow.
80-107: LGTM! Multiple synthetic deletes test validates batch move-out.Good use of
await_countto collect multiple delete messages and verify both children are removed when their shared parent is deactivated.
225-270: Excellent edge case coverage for stale tag index entries.This test (lines 225-270) validates a critical edge case: when a child changes parents, deactivating the old parent should not generate a spurious delete. The detailed comment explains the expected behavior clearly. This protects against the bug where stale tag-index entries would cause incorrect synthetic deletes.
288-312: LGTM! Tag consistency test properly validates deterministic behavior.The test now correctly asserts that tags are present (lines 310-311), addressing the previous review feedback about conditional assertions silently passing when tags are nil.
330-357: LGTM! Resume test validates state preservation across disconnects.This test correctly simulates the client disconnect/reconnect scenario and verifies that move-out events processed while disconnected are properly reflected when resuming.
360-381: LGTM! Helper function creates appropriate test schema.The
with_parent_child_tableshelper creates a clean parent/child relationship with proper foreign key constraints and returns the table metadata for test setup.packages/elixir-client/test/electric/client_test.exs (7)
38-47: LGTM!drain_streamhelper enables internal state inspection.This helper is well-designed for testing:
- Recursively consumes the stream until halt
- Returns the final stream struct for state inspection
- Enables tests to verify
tag_to_keysandkey_datastate
1375-1429: LGTM! Tag index empty test validates backward compatibility.This test ensures that when messages have no tags (non-subquery shapes), the tag index remains empty and doesn't unnecessarily track rows.
1431-1492: LGTM! Core move-out synthetic delete test.This test validates the fundamental move-out flow:
- Insert with tag tracked in index
- Move-out pattern matches the tag
- Synthetic delete generated with correct value
1777-1858: Test name accurately reflects expected behavior after past feedback.The test "update without removed_tags deduplicates synthetic deletes" correctly describes the expected deduplication behavior. This addresses the past review comment about misleading test names.
1860-1924: Test documents expected multi-tag behavior.This test correctly asserts that a row with multiple tags should NOT be deleted when only one tag is moved out. The comment on lines 1914-1915 and assertion on line 1922-1923 clearly document the expected behavior.
2073-2165: Important edge case: update removing all tags.This test validates that when an update removes all tags from a row (via
removed_tagswith no newtags), the tag index is properly cleared. This prevents stale entries from causing incorrect synthetic deletes.The detailed comments (lines 2073-2081) clearly explain the expected behavior and why this matters.
2167-2251: LGTM! Resume state preservation test.This test validates that after resuming a stream, the tag index state is properly restored so move-out events can still generate synthetic deletes. This is critical for correct behavior across client disconnects.
packages/elixir-client/test/electric/client/message_test.exs (4)
5-8: LGTM! Test setup updated for new message types.The alias now includes
MoveOutMessageand the module-level@tstimestamp provides consistent test data.
36-83: LGTM! Comprehensive MoveOutMessage parsing tests.Good coverage of:
- String keys (from JSON)
- Atom keys (from internal representation)
- Multiple patterns with different positions
The single-assert pattern matching style (lines 73-81) is clean and idiomatic, addressing past feedback.
85-131: LGTM! Tag header parsing tests.Tests cover all tag-related scenarios:
- Tags only
- Removed_tags only
- Both tags and removed_tags
- Defaults to empty lists when absent
133-173: LGTM! ChangeMessage parsing tests.Good coverage of insert, update (with old_value), and delete operations. Tests validate key, value, headers, and handle propagation.
packages/elixir-client/lib/electric/client/stream.ex (7)
22-27: LGTM! Tag index fields added to stream struct.The comments clearly explain the dual data structure:
tag_to_keys: efficient lookup of keys by tag valuekey_data: tracks each key's tags and latest messageThis design enables O(1) lookups in both directions.
242-261: Move-out handling is well-implemented.The handler correctly:
- Generates synthetic deletes for matching patterns
- Updates tag indices
- Buffers synthetic deletes for emission
The comment on lines 246-247 documents the assumption about move-out timing, which was discussed in past review comments.
267-274: Resume message correctly includes tag state.The
ResumeMessagenow carriestag_to_keysandkey_datafor state preservation across client disconnects. This is essential for correct move-out handling after resume.
397-416: Resume correctly restores tag state with safe defaults.Using
Map.get(resume, :tag_to_keys, %{})provides backward compatibility with resume messages that don't have tag state (e.g., from older clients or non-subquery shapes).
436-493: Tag index update logic is comprehensive and correct.The implementation correctly handles:
- Insert: add key to tag_to_keys, store in key_data
- Update: remove old tags, add new tags, update message
- Delete: remove key from all its tags, delete from key_data
- No tags: don't track the key at all
The logic for handling
removed_tagsproperly removes stale entries, preventing the bug where old parent deactivation would incorrectly delete a re-parented child.
524-587: Synthetic delete generation handles edge cases correctly.Key implementation details:
- First pass collects all matching keys and removes tags from
tag_to_keys- Second pass checks if keys have remaining tags before generating deletes
- Only keys with no remaining tags get synthetic deletes
- Synthetic deletes use the latest stored message value
The deduplication is implicit: each key appears only once in
key_data, so even if multiple patterns match the same key, only one delete is generated.
279-282: LGTM! Error unwrapping handles various response shapes.The
unwrap_errorhelper correctly handles:
- Empty list → "Unknown error"
- Single-element list → extract message
- Multi-element list → return as-is
- Other values → return as-is
packages/elixir-client/lib/electric/client/message.ex (5)
7-58: LGTM!The Headers module properly integrates the new
tagsandremoved_tagsfields with appropriate defaults and type definitions. Thefrom_message/2function correctly handles optional fields withMap.get/3fallbacks.
60-112: LGTM!The
request_timestamppropagation is consistent across bothfrom_message/3clauses, and the type definition correctly reflects the required field.
114-166: LGTM!The
from_message/4function correctly accepts and setsrequest_timestamp, maintaining consistency with other message types. The value mapping error handling is appropriate.
168-199: LGTM!The
tag_to_keysandkey_datafields are properly typed for move-out state tracking. Themsgfield type has been correctly updated toChangeMessage.t()as previously requested.
273-289: Potential pattern mismatch betweenparse/4andMoveOutMessage.from_message/3.The
parse/4clauses match move-out events without requiring thepatternskey:def parse(%{"headers" => %{"event" => "move-out"}} = msg, ...)However,
MoveOutMessage.from_message/3requirespatternsto be present:def from_message(%{"headers" => %{"event" => "move-out", "patterns" => patterns}}, ...)If the server ever sends a move-out event without
patterns, this will raise aFunctionClauseError.Consider aligning the pattern match in
parse/4to requirepatterns:🛡️ Proposed defensive fix
def parse( - %{"headers" => %{"event" => "move-out"}} = msg, + %{"headers" => %{"event" => "move-out", "patterns" => _}} = msg, shape_handle, _value_mapper_fun, request_timestamp ) do [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)] end def parse( - %{headers: %{event: "move-out"}} = msg, + %{headers: %{event: "move-out", patterns: _}} = msg, shape_handle, _value_mapper_fun, request_timestamp ) do [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)] endPlease confirm that the Electric server protocol guarantees
patternsis always present in move-out events. If not guaranteed, the defensive fix above would prevent the crash and let such messages fall through (or add explicit error handling).
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
packages/sync-service/test/electric/shapes/partitioned_tables_test.exs
Outdated
Show resolved
Hide resolved
c6fb79f to
2d76f1c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/stream.ex`:
- Around line 246-251: Add explicit documentation about the assumption that
move-out pattern matching uses non-positional (whole-tag) matching for composite
tags by updating the module-level docs or the MoveOutMessage struct docs;
mention that generate_synthetic_deletes relies on this and that positional
matching for composite tags is intentionally not supported. Reference the
generate_synthetic_deletes function and the MoveOutMessage type so readers know
where the assumption is enforced, and briefly state why this simplifies applying
move-out events immediately (no buffering for later inserts).
In `@packages/sync-service/test/integration/subquery_move_out_test.exs`:
- Around line 109-131: The test comment is misleading about the delete
mechanism: in the "deleting parent row triggers move-out" test (which calls
Client.stream and asserts via assert_delete), clarify whether the child is
removed by Postgres ON DELETE CASCADE or by Electric's synthetic move-out;
update the inline comment that currently reads "Should receive a synthetic
delete for the child" to accurately state the expected behavior (e.g., "Should
receive a delete for the child (cascade delete)" or explicitly note that
Electric emits a synthetic delete if that is the expected behavior) so the
assertion via assert_delete matches the documented expectation.
♻️ Duplicate comments (2)
packages/elixir-client/lib/electric/client/message.ex (1)
246-251: Pattern normalization now consistently extracts only required fields.The atom-key clause uses
= patternwhich preserves any extra keys. For complete consistency with the string-key clause, consider explicit extraction.♻️ Suggested fix for complete normalization
defp normalize_patterns(patterns) do Enum.map(patterns, fn %{"pos" => pos, "value" => value} -> %{pos: pos, value: value} - %{pos: _, value: _} = pattern -> pattern + %{pos: pos, value: value} -> %{pos: pos, value: value} end) endpackages/sync-service/test/support/stream_consumer.ex (1)
197-216: Strict ordering assertion may cause confusing test failures.The
do_assert_receive_messagefunction callsflunkon the first non-matching message (line 210). This means if anup_to_datemessage arrives before an expectedinsert, the test fails with "Received unexpected message" rather than waiting for the insert.This is actually the desired behavior for these tests (strict ordering), but consider documenting this in the module doc or function docs. The past review flagged this as inconsistent with
assert_resume, but looking at the current code, all assertions now use the sameassert_receive_messagehelper.📝 Suggested documentation addition
+ `@doc` """ + Assert an insert message is received with matching value fields. + + **Note:** This assertion is strict - any non-matching message received first + will cause the test to fail. Ensure message ordering is deterministic. + """ def assert_insert(%__MODULE__{} = consumer, value_pattern, timeout \\ nil) do
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (12)
.changeset/beige-ladybugs-give.mdpackages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.expackages/elixir-client/lib/electric/client/message.expackages/elixir-client/lib/electric/client/stream.expackages/elixir-client/test/electric/client/message_test.exspackages/elixir-client/test/electric/client_test.exspackages/sync-service/mix.exspackages/sync-service/test/integration/streaming_test.exspackages/sync-service/test/integration/subquery_move_out_test.exspackages/sync-service/test/support/integration_setup.expackages/sync-service/test/support/stream_consumer.expackages/sync-service/test/test_helper.exs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead
Applied to files:
packages/sync-service/test/integration/streaming_test.exs
🧬 Code graph analysis (6)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
build_router_opts(487-506)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
assert_insert(56-70)assert_up_to_date(124-135)packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
query!(90-105)
packages/sync-service/test/test_helper.exs (1)
packages/sync-service/lib/electric/application.ex (1)
start(6-20)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)
packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
cast_to(186-193)cast_to(195-226)cast_to(228-233)
packages/sync-service/test/integration/subquery_move_out_test.exs (1)
packages/sync-service/test/support/stream_consumer.ex (7)
assert_insert(56-70)assert_up_to_date(124-135)assert_delete(105-119)await_count(161-166)assert_update(76-100)collect_messages(242-247)assert_resume(140-151)
packages/elixir-client/test/electric/client/message_test.exs (1)
packages/elixir-client/lib/electric/client/message.ex (6)
parse(256-258)parse(260-267)parse(269-271)parse(273-280)parse(282-289)parse(291-293)
🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md
6-6: First line in a file should be a top-level heading
(MD041, first-line-heading, first-line-h1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Run Lux integration tests
🔇 Additional comments (33)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)
1-94: LGTM! Conditional guard correctly makes the module optional.The
Code.ensure_loaded?(Ecto)pattern is the standard Elixir idiom for conditionally defining modules that depend on optional packages. This correctly prevents compilation errors when Ecto is not available.The parent
EctoAdaptermodule is similarly guarded, ensuring consistency throughout the dependency chain—callers ofArrayDecoder.decode!/2viaEctoAdapter.cast_to({:array, type})will never encounter undefined module errors..changeset/beige-ladybugs-give.md (1)
2-3: Thepatchversion bump is appropriate for this PR. The project's established convention usespatchfor feature additions (e.g., "Support multiple subqueries on the same level" was marked aspatch, and "Add function to dynamically re-enable consumer suspension" was also marked aspatch). Theminordesignation is reserved for major architectural changes, such as database storage replacements. This changeset aligns with the project's versioning patterns.Likely an incorrect or invalid review comment.
packages/sync-service/mix.exs (1)
122-124: LGTM!The test-scoped path dependency for
electric_clientcorrectly enables sync-service integration tests to exercise the Elixir client against the local HTTP server. Usingruntime: falseis appropriate since the client is only needed at test time for assertions and streaming, not as a runtime dependency.packages/sync-service/test/test_helper.exs (1)
10-14: LGTM!The workaround is well-documented and addresses a real circular dependency issue. The direct call to
Electric.Client.Application.start/2bypasses OTP's dependency resolution, preventing the deadlock that would occur when OTP tries to start both:electricand:electric_clientapplications.packages/elixir-client/test/electric/client/message_test.exs (4)
10-34: LGTM!The ControlMessage tests are correctly updated to pass the
request_timestampparameter, maintaining coverage for all control message types with both string and atom key formats.
36-83: LGTM!Comprehensive tests for
MoveOutMessageparsing covering both key formats and multiple patterns. The consolidated pattern matching approach on lines 73-81 is clean and idiomatic.
85-131: LGTM!Thorough coverage of tag-related header parsing, including the important edge case of verifying defaults to empty lists for backward compatibility with untagged shapes.
133-174: LGTM!Well-structured tests for
ChangeMessageparsing covering all three operation types (insert, update, delete) with appropriate field validation includingold_valuehandling for updates.packages/sync-service/test/integration/streaming_test.exs (3)
1-22: LGTM!Well-structured integration test module with clear documentation. The
async: falsesetting is appropriate for tests that interact with a shared database and HTTP server. The modular setup chain keeps the test configuration clean and reusable.
24-61: LGTM!Solid integration tests covering both streaming modes. The first test verifies snapshot retrieval with
live: false, while the second demonstrates the complete live streaming flow—receiving the initial snapshot, then observing real-time changes from aPostgrex.query!insert. Thewith_consumermacro provides a clean assertion interface.
63-69: LGTM!Important edge case coverage ensuring the client correctly handles empty tables by receiving an
up-to-datecontrol message without any preceding inserts.packages/sync-service/test/support/integration_setup.ex (1)
1-42: LGTM!Well-designed test helper with clear documentation. The implementation follows good practices:
- Ephemeral port binding (
port: 0) avoids port conflicts in parallel test runsnum_acceptors: 1is appropriate for test scenarios- Proper use of
ExUnit.Callbacks.start_supervisedensures cleanup- Returns a comprehensive context map for test assertions
packages/elixir-client/test/electric/client_test.exs (6)
38-47: LGTM! Clean helper for inspecting stream internals.The recursive
drain_stream/1helper correctly iterates through the stream until:haltand returns the final stream struct. The local alias keeps it self-contained.
898-898:max_retriesincrease is appropriate.The increase from 10 to 15 accommodates the 8 retry statuses plus connection retry and initial 500 retry in the test scenario (lines 852-862). This ensures the test doesn't flake due to insufficient retries.
1375-1429: Solid test for backward compatibility with untagged shapes.The test correctly verifies that when messages lack tags, the tag index remains empty, ensuring no interference with shapes that don't use move-out semantics.
1431-1492: Good coverage of basic move-out → synthetic delete flow.The test validates the core functionality: insert with tag → move-out matching tag → synthetic delete with correct value.
1777-1858: Test name and comments now align with behavior.The test "update without removed_tags deduplicates synthetic deletes" correctly documents expected behavior (deduplication), and the assertion verifies only 1 synthetic delete is generated.
2167-2251: Test implementation is correct - tag state is properly persisted during resume.The ResumeMessage struct properly stores
tag_to_keysandkey_datawhen created at up-to-date (lines 270-274), and the resume function correctly restores these fields (lines 399-407). The test logic is sound and should work as expected.packages/sync-service/test/integration/subquery_move_out_test.exs (3)
1-36: Well-structured integration test module.Good moduledoc explaining the three key aspects being tested. The setup chain properly wires database, stack, and client components.
330-357: Resume test scenario may have timing issues.The test deactivates the parent "while disconnected" (between
stream1completing andstream2starting). However, the move-out event is generated server-side when the UPDATE happens. Ifstream2resumes and the server already processed the move-out, the client should receive it. But if the resume offset is past the move-out event, the client may never see it.Consider adding a comment clarifying whether this tests:
- Server-side buffering of move-out events for resumed clients, or
- Client-side tag state restoration enabling local synthetic delete generation
360-381: LGTM! Clean table setup helper.The helper creates a proper parent/child relationship with foreign key and returns table metadata for test assertions.
packages/elixir-client/lib/electric/client/stream.ex (5)
22-27: Clear documentation of the dual-index design.The comments explain the purpose of both
tag_to_keys(tag → keys lookup) andkey_data(key → tags + message lookup). This bidirectional index enables efficient move-out processing.
242-261: Move-out handling looks correct.The implementation correctly:
- Generates synthetic deletes for matching patterns
- Buffers them for emission
- Updates both index structures
The assumption comment about move-out timing after snapshot is appropriate.
279-282: Good defensive handling for empty error arrays.The
unwrap_error([])clause prevents crashes when the server returns an empty error array, providing a fallback message.
436-493: Tag index update logic handles edge cases well.The function properly:
- Handles deletes by clearing the key from both structures
- Handles updates with
removed_tagsby computing set differences- Clears keys entirely when they have no remaining tags
One minor observation: when
updated_tagsis empty butcurrent_tagswas non-empty, tags are removed fromtag_to_keyscorrectly (lines 468-474).
524-587: Synthetic delete generation correctly handles multi-tag scenarios.The two-pass approach (collect matched keys → filter by remaining tags) ensures:
- Multiple patterns don't create duplicate deletes
- Keys with remaining tags aren't deleted
The
request_timestampis properly propagated from the move-out event.packages/sync-service/test/support/stream_consumer.ex (2)
1-52: Clean consumer lifecycle management.The
start/2function properly captures the test PID and spawns a task that forwards messages. Thestop/1usesbrutal_killwhich is appropriate for test cleanup.
265-289: Ergonomic macro for test cleanup.The
with_consumermacro correctly:
- Starts the consumer
- Binds
consumervariable for use in the block- Ensures cleanup in
afterclauseUsing
var!(consumer)makes the binding available in the caller's scope.packages/elixir-client/lib/electric/client/message.ex (5)
8-48: Headers struct properly extended for tag support.The additions of
tagsandremoved_tagsfields with sensible defaults ([]) maintain backward compatibility. The type definitions are clear.
71-95: ControlMessage now properly propagates request_timestamp.Both
from_messageclauses (string and atom keys) consistently accept and store the timestamp.
190-198: ResumeMessage type correctly uses ChangeMessage.t() for msg field.This addresses the past review comment about type safety. The
key_datamap value type is now properly typed.
201-252: MoveOutMessage module is well-designed.Good moduledoc explaining the purpose. The
normalize_patternshelper handles both string and atom key formats.
254-293: Parse function correctly threads request_timestamp through all branches.All message type branches now receive and propagate the timestamp consistently. The empty string case returns
[]as expected.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
Rename unwrap to unwrap_error and add handling for empty error arrays from the server. Previously an empty array would return [] as the error message; now it returns "Unknown error". 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
When resuming a stream, the client was making non-live requests (live: false) because up_to_date? defaulted to false. This caused the server to return immediately with up-to-date instead of long-polling for new data. Since a ResumeMessage is only created when the client is already up-to-date, resuming should set up_to_date?: true so the first request uses long-polling to wait for any changes that occurred while disconnected. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Set max_connections = 200 in local dev configs and CI workflow to support integration tests that open multiple database connections. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Reduce default pool_size from 2 to 1 in test setup to minimize connection usage. Tests that require multiple connections now use explicit @tag connection_opt_overrides to request a larger pool. Also set partitioned_tables_test to async: false to avoid connection contention. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add test helpers for running integration tests with Electric.Client against a real HTTP server: - Add electric_client as test dependency in mix.exs - Start electric_client application in test_helper.exs - Add Support.IntegrationSetup with with_electric_client/2 - Add Support.StreamConsumer for consuming streams in tests - Add basic streaming integration tests to demonstrate usage 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Wrap ArrayDecoder module in Code.ensure_loaded?(Ecto) guard to prevent compile errors when the elixir-client is used as a dependency in projects that don't have Ecto available. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add client-side support for handling move-out events from the server when using shapes with subquery filters. Changes to message handling: - Add tags and removed_tags fields to Headers struct - Add request_timestamp parameter to Message.parse/4 - Add MoveOutMessage type for parsing move-out control messages - Add tag_to_keys and key_data fields to ResumeMessage for state preservation Changes to stream processing: - Track row tags in tag_to_keys and key_data indices - Handle MoveOutMessage by generating synthetic delete messages - Reset tag indices on shape reset - Preserve and restore tag state when resuming streams The move-out feature enables the server to notify clients when rows should be removed from their view because they no longer match the shape's subquery filter. The client generates synthetic delete messages to cleanly remove these rows from local state. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2d76f1c to
0631be3
Compare
|
Found 44 test failures on Blacksmith runners: Failures
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @.github/workflows/sync_service_tests.yml:
- Line 45: Add a brief inline comment above the existing command key to explain
that the service container "command" key (used on the postgres service) is an
undocumented but working way to override the container CMD in GitHub Actions;
reference the 'command' key so maintainers know why it's present and that
actionlint/docs may not show it.
♻️ Duplicate comments (2)
packages/sync-service/test/support/integration_setup.ex (1)
17-41: Well-structured integration test helper.The implementation correctly uses ephemeral ports, proper supervised processes, and follows ExUnit conventions. One minor consideration: the hardcoded 2000ms timeout on line 18 may be tight in slower CI environments.
♻️ Optional: Make timeout configurable
def with_electric_client(ctx, opts \\ []) do - :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 2000) + status_timeout = Keyword.get(opts, :status_timeout, 2000) + :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: status_timeout)packages/sync-service/test/support/stream_consumer.ex (1)
56-151: Assertion functions have consistent strict-ordering behavior.All assertion helpers (
assert_insert,assert_update,assert_delete,assert_up_to_date,assert_resume) now useassert_receive_messagewhich fails immediately on the first non-matching message. This provides strict ordering enforcement, which is appropriate for precise integration test scenarios.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (18)
.changeset/beige-ladybugs-give.md.github/workflows/sync_service_tests.ymlpackages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.expackages/elixir-client/lib/electric/client/message.expackages/elixir-client/lib/electric/client/stream.expackages/elixir-client/test/electric/client/message_test.exspackages/elixir-client/test/electric/client_test.exspackages/sync-service/dev/postgres.confpackages/sync-service/dev/postgres2.confpackages/sync-service/mix.exspackages/sync-service/test/electric/postgres/configuration_test.exspackages/sync-service/test/electric/postgres/lock_breaker_connection_test.exspackages/sync-service/test/integration/streaming_test.exspackages/sync-service/test/integration/subquery_move_out_test.exspackages/sync-service/test/support/db_setup.expackages/sync-service/test/support/integration_setup.expackages/sync-service/test/support/stream_consumer.expackages/sync-service/test/test_helper.exs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-01-15T13:37:13.685Z
Learnt from: robacourt
Repo: electric-sql/electric PR: 3699
File: .github/workflows/sync_service_tests.yml:45-45
Timestamp: 2026-01-15T13:37:13.685Z
Learning: In GitHub Actions workflow files, the service container for a job may support a command option to pass startup arguments (e.g., command: postgres -c max_connections=200), even if this usage isn’t clearly documented in the official workflow syntax. This pattern has been observed in the electric-sql/electric repository CI. Treat this as an undocumented but usable convention: verify in the runner version you target, test in CI, and avoid relying on it in public docs. If you use it, consider adding a comment in the workflow explaining the rationale and note that it’s an undocumented capability that may vary across environments.
Applied to files:
.github/workflows/sync_service_tests.yml
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead
Applied to files:
packages/sync-service/test/integration/streaming_test.exs
🧬 Code graph analysis (5)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
assert_insert(56-70)assert_up_to_date(124-135)packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
query!(90-105)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
build_router_opts(487-506)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)
packages/elixir-client/lib/electric/client/fetch/response.ex (1)
decode!(28-34)packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
cast_to(186-193)cast_to(195-226)cast_to(228-233)
packages/sync-service/test/test_helper.exs (1)
packages/sync-service/lib/electric/application.ex (1)
start(6-20)
packages/elixir-client/lib/electric/client/stream.ex (1)
packages/elixir-client/lib/electric/client/message.ex (6)
parse(256-258)parse(260-267)parse(269-271)parse(273-280)parse(282-289)parse(291-293)
🪛 actionlint (1.7.10)
.github/workflows/sync_service_tests.yml
45-45: unexpected key "command" for "services" section. expected one of "credentials", "env", "image", "options", "ports", "volumes"
(syntax-check)
🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md
6-6: First line in a file should be a top-level heading
(MD041, first-line-heading, first-line-h1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Test packages/typescript-client w/ sync-service
- GitHub Check: Test packages/experimental w/ sync-service
- GitHub Check: Run Lux integration tests
🔇 Additional comments (43)
packages/sync-service/test/support/db_setup.ex (1)
7-7: LGTM! Reducing default pool size aligns with the broader test infrastructure changes.The reduction from
pool_size: 2topool_size: 1is consistent with the PR's approach of increasingmax_connectionssystem-wide (to 200) while reducing per-pool concurrency. Tests requiring multiple concurrent connections (like the lock timeout test inconfiguration_test.exs) explicitly override this default.packages/sync-service/dev/postgres2.conf (1)
4-4: LGTM! Consistent with the other Postgres configuration changes.The
max_connections = 200setting aligns withpostgres.confand the CI workflow, ensuring consistent behavior across development and test environments.packages/sync-service/test/electric/postgres/configuration_test.exs (1)
107-107: LGTM! Necessary override for test requiring concurrent connections.This test spawns a Task to hold a table lock while simultaneously attempting
add_table_to_publicationfrom the test process. With the defaultpool_size: 1, both operations would contend for the same connection, causing a deadlock. The explicitpool_size: 2override correctly documents and enables this test's concurrency requirements..changeset/beige-ladybugs-give.md (1)
1-6: LGTM! Changeset correctly documents the patch releases.The changeset follows the standard format for
@changesets/cli. The MD041 linting warning about first-line headings is a false positive—changeset files are expected to begin with YAML frontmatter, not markdown headings.packages/sync-service/mix.exs (1)
122-123: LGTM!The test dependency is correctly configured with
only: [:test]andruntime: false, using a path reference appropriate for monorepo sibling packages.packages/sync-service/dev/postgres.conf (1)
4-4: LGTM!Increasing
max_connectionsto 200 aligns with CI workflow settings and supports higher test concurrency requirements introduced by integration tests.packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs (1)
16-17: LGTM!The
pool_size: 2override is appropriate here since this test requires concurrent connections—one to hold the advisory lock (via the Task) and another for the lock breaker operations.packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)
1-2: LGTM! Parent module has matching guard.The
Code.ensure_loaded?(Ecto)guard is idiomatic for optional dependencies and is consistently applied to both this module and its parentElectric.Client.EctoAdapter, preventing compile errors when Ecto is unavailable.packages/sync-service/test/test_helper.exs (1)
10-14: LGTM - Circular dependency workaround is well-documented.The direct application start bypassing OTP dependency resolution is appropriate for the test environment. The comment clearly explains the rationale, and as discussed in prior review, the
{:ok, _}pattern match is acceptable sincetest_helper.exsruns once per fresh VM in standard test runs.packages/sync-service/test/support/stream_consumer.ex (4)
23-43: LGTM - Clean stream consumer initialization.The implementation correctly captures the test PID and spawns a Task that forwards stream messages. Using
Task.asyncis appropriate here since the task will be explicitly shut down viastop/1.
48-51: LGTM.
161-193: LGTM - Correct timeout handling.
await_countproperly tracks elapsed time using monotonic timestamps and skips non-matching messages (unlike the strictassert_*functions). This is appropriate for collecting a specific count of matching messages regardless of interleaved control messages.
279-289: LGTM - Clean test DSL macro.The
with_consumermacro provides ergonomic test setup/teardown with proper cleanup in theafterblock. Usingvar!(consumer)to inject the binding is the standard Elixir approach for test DSL macros.packages/elixir-client/lib/electric/client/message.ex (6)
7-48: LGTM - Headers struct extended for tag tracking.The additions of
tagsandremoved_tagsfields with sensible defaults ([]) correctly support the move-out semantics. Thefrom_message/2function properly extracts these from the message payload with fallback to empty lists.
71-95: LGTM -request_timestamppropagation implemented.Both string-key and atom-key clauses of
from_messagenow accept and assignrequest_timestamp, addressing the prior review feedback.
129-144: LGTM.
188-198: LGTM - ResumeMessage extended for move-out state preservation.The
tag_to_keysandkey_datafields enable resuming streams with preserved tag index state. The type formsginkey_datais now properly specified asChangeMessage.t()(addressing prior feedback).
201-252: LGTM - MoveOutMessage module is well-structured.The module correctly handles both string-key and atom-key input formats with proper normalization. The
normalize_patternshelper ensures consistent pattern structure for downstream processing.
256-293: LGTM -parse/4correctly routes all message types with timestamp.All branches consistently propagate
request_timestamp:
ChangeMessageviafrom_message/4ControlMessageviafrom_message/3MoveOutMessageviafrom_message/3The move-out event branches correctly handle both string-key (
"headers" => %{"event" => "move-out"}) and atom-key formats.packages/elixir-client/lib/electric/client/stream.ex (9)
22-27: LGTM - Move-out state fields added with clear documentation.The inline comments clearly explain the dual data structure approach:
tag_to_keys: reverse index for efficient pattern matchingkey_data: per-key state including tags and latest message
242-261: LGTM - Move-out handling correctly generates synthetic deletes.The implementation:
- Generates synthetic deletes for matching patterns
- Adds deletes to the buffer
- Updates tag state atomically
The documented assumption about move-out timing (only after snapshot) is appropriate given the protocol guarantees.
267-277: LGTM - ResumeMessage correctly captures move-out state.Including
tag_to_keysandkey_datain the ResumeMessage ensures that resumed streams can continue to process move-out events correctly.
279-282: LGTM - Robust error unwrapping.Handles all edge cases: empty arrays (returns sensible default), single-element arrays (unwraps), multi-element arrays (preserves), and non-arrays (passthrough).
335-347: LGTM - Reset correctly clears move-out state.On
must-refetch, clearingtag_to_keysandkey_datais correct since the client will receive a fresh snapshot.
397-416: LGTM - Resume correctly restores move-out state with backward compatibility.Using
Map.get(resume, :tag_to_keys, %{})ensures backward compatibility with ResumeMessages that don't have these fields (from older client versions or untagged shapes).
436-493: LGTM - Tag index management is correctly implemented.The logic handles all cases:
- Delete operations: Removes key from all associated tags and clears
key_data- Insert/Update with no tags: Clears tracking for the key
- Insert/Update with tags: Computes tag diffs and updates both indexes
The use of
MapSet.differenceandMapSet.unionfor calculating tag changes is correct. The approach of tracking the diff (tags_to_remove,tags_to_add) minimizes unnecessary operations.
495-522: LGTM - Tag manipulation helpers are efficient and correct.The helpers properly:
- Iterate directly over MapSets (addressing prior feedback about unnecessary
to_list)- Clean up empty tag entries from
tag_to_keysto prevent unbounded growth
524-587: LGTM - Synthetic delete generation is well-designed.The two-pass algorithm is correct:
- First pass: Atomically removes matching tags from
tag_to_keysand tracks affected keys- Second pass: For each affected key, checks remaining tags and only generates delete if no tags remain
Key correctness properties:
- Uses
Map.popfor atomic removal from tag index- Preserves rows that still have other tags (partial move-out)
- Synthetic deletes correctly propagate
request_timestampfrom the move-out event- Uses the latest
msgvalue fromkey_datafor the delete payloadpackages/elixir-client/test/electric/client_test.exs (9)
38-47: LGTM -drain_streamhelper for inspecting final stream state.The helper correctly uses tail recursion to consume the stream until
{:halt, stream}, returning the final stream struct for state inspection. This is useful for verifying internal state liketag_to_keysandkey_data.
1375-1429: LGTM - Test verifies tag index stays empty for untagged shapes.This test confirms backward compatibility: when messages have no tags, the tag index structures remain empty, ensuring no overhead for shapes that don't use move-out semantics.
1431-1492: LGTM - Core move-out test case.Correctly verifies the complete flow: insert with tag → move-out pattern match → synthetic delete generated with correct value.
1624-1699: LGTM - Testsremoved_tagshandling correctly.This test verifies that when a row's tag is changed (old tag removed, new tag added), a subsequent move-out for the old tag doesn't generate a spurious delete.
1777-1858: LGTM - Deduplication test.Verifies that updates with the same tag don't cause duplicate synthetic deletes - important for correct tag index management.
1860-1924: Good coverage of multi-tag partial move-out scenario.This test verifies the important invariant: a row with multiple tags should not be deleted when only one of its tags is moved out. The row should remain visible as long as it has any remaining tags.
2009-2071: Good test for deduplication when multiple patterns match same row.Verifies that when a single move-out event contains multiple patterns that all match the same row, only one synthetic delete is generated.
2073-2165: Excellent edge case coverage for tag removal.This test documents the expected behavior when a row's tags are completely removed via an update (with
removed_tagsbut no newtags). The tag index should be cleared so subsequent move-outs are no-ops.
2167-2251: LGTM - Resume state preservation test.Verifies the critical requirement that move-out state is preserved across stream resume, enabling synthetic deletes to work correctly after resumption.
packages/sync-service/test/integration/streaming_test.exs (1)
1-70: Well-structured integration test module.The test file properly exercises the Electric.Client streaming functionality with clear test cases covering:
- Initial snapshot with pre-existing data
- Live streaming with subsequent inserts
- Empty table handling
Good use of the
with_consumermacro and assertion helpers. Theasync: falsesetting is appropriate for integration tests with database state dependencies.packages/elixir-client/test/electric/client/message_test.exs (1)
1-174: Comprehensive test coverage for message parsing.The test file thoroughly covers:
- All
ControlMessagetypes with both string and atom key formatsMoveOutMessageparsing with single and multiple patternsHeaderswith tags, removed_tags, and default empty list behaviorChangeMessagefor all operations (insert, update, delete) includingold_valuehandlingGood refactoring of the multiple patterns test to use a single pattern-matching assertion (lines 73-81).
packages/sync-service/test/integration/subquery_move_out_test.exs (4)
109-131: Clarify comment: this may be a CASCADE delete, not a synthetic move-out delete.The test comment at line 128 says "Should receive a synthetic delete for the child", but the child table has
ON DELETE CASCADE(line 372). When the parent row is deleted, PostgreSQL cascades the delete to the child row directly. This is different from the deactivation tests (lines 55-73, 80-107) whereUPDATE active = falsetriggers move-out and synthetic deletes.Consider clarifying whether:
- The test expects a real CASCADE delete from Postgres, or
- Electric intercepts and emits a synthetic delete despite the cascade
This distinction matters for understanding the test's intent.
225-270: Excellent edge case test for tag index updates.This test verifies critical behavior: when a child changes parents, the tag index must be updated so that deactivating the old parent doesn't incorrectly generate a synthetic delete. The detailed comments explain the expected behavior clearly, and the assertion message provides good debugging context.
330-357: Good coverage of resume with move-out state.This test properly simulates a disconnect/reconnect scenario where changes occur while the client is "offline". Using
resume_msgfrom the first stream to resume ensures the client correctly processes move-out events that happened during disconnection.
360-381: Helper function is well-structured.The
with_parent_child_tables/1helper cleanly creates the schema needed for subquery tests. Returning%{tables: [...]}follows the setup context pattern used elsewhere.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| --health-retries 5 | ||
| ports: | ||
| - 54321:5432 | ||
| command: postgres -c max_connections=200 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider adding a comment explaining this undocumented capability.
Based on learnings, the command key for GitHub Actions service containers is an undocumented but functional feature. Since actionlint and official docs don't recognize it, future maintainers may be confused. A brief inline comment would help:
# Note: 'command' is undocumented but works to override container CMD
command: postgres -c max_connections=200🧰 Tools
🪛 actionlint (1.7.10)
45-45: unexpected key "command" for "services" section. expected one of "credentials", "env", "image", "options", "ports", "volumes"
(syntax-check)
🤖 Prompt for AI Agents
In @.github/workflows/sync_service_tests.yml at line 45, Add a brief inline
comment above the existing command key to explain that the service container
"command" key (used on the postgres service) is an undocumented but working way
to override the container CMD in GitHub Actions; reference the 'command' key so
maintainers know why it's present and that actionlint/docs may not show it.
Fixes #3672
Summary
The Problem
When a shape contains a subquery like WHERE parent_id IN (SELECT id FROM parent WHERE active = true), rows can dynamically enter or exit the shape based on changes to dependency tables. For example, deactivating a parent row should cause all its children to be removed from the shape (move-out), even though the child rows themselves weren't modified.
The Solution
The Elixir client now:
Testing Approach
A key aspect of this PR is the integration test design. Rather than relying solely on unit tests with mocks, these tests spin up the full Electric stack and exercise real scenarios:
Benefits of this approach:
Changes
Test Plan
Additional changes separated from the main feature:
Bug Fixes (candidates for backporting to main)
- Renamed unwrap → unwrap_error and added handling for empty error arrays from the server
- Previously [] would be returned as the error message; now returns "Unknown error"
- When resuming a stream, the client was making non-live requests because up_to_date? defaulted to false
- Since a ResumeMessage is only created when already up-to-date, resuming should set up_to_date?: true so requests use long-polling
Infrastructure (supporting integration tests)
- Set max_connections = 200 in dev configs and CI to support integration tests with multiple concurrent connections
- Changed default pool_size from 2 to 1 in test setup
- Tests needing more connections use explicit @tag connection_opt_overrides: [pool_size: 2]
- Wrapped ArrayDecoder module in Code.ensure_loaded?(Ecto) to prevent compile errors when Ecto is unavailable
Note: The commit history has been carefully organized to separate bug fixes, infrastructure changes, and the core feature implementation (by claude!). The two bug fixes in commits 1-2 are independent improvements discovered during development that could be backported to main.
Summary by CodeRabbit
Release Notes
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.