Skip to content

Conversation

@robacourt
Copy link
Contributor

@robacourt robacourt commented Jan 13, 2026

Fixes #3672

Summary

  • Implements tagged row and move-out event support in the Elixir client for shapes with subqueries in WHERE clauses
  • Introduces end-to-end integration tests that validate the full client/server behavior
  • Adds StreamConsumer test helper for expressive, human-readable test assertions

The Problem

When a shape contains a subquery like WHERE parent_id IN (SELECT id FROM parent WHERE active = true), rows can dynamically enter or exit the shape based on changes to dependency tables. For example, deactivating a parent row should cause all its children to be removed from the shape (move-out), even though the child rows themselves weren't modified.

The Solution

The Elixir client now:

  1. Tracks tags on change messages (MD5 hashes indicating why rows belong to the shape)
  2. Processes move-out control messages from the server
  3. Generates synthetic delete messages when move-outs occur
  4. Maintains a tag index to correctly handle edge cases (e.g., a row changing parents)

Testing Approach

A key aspect of this PR is the integration test design. Rather than relying solely on unit tests with mocks, these tests spin up the full Electric stack and exercise real scenarios:

  test "receives move-out when parent is deactivated", %{client: client, shape: shape, db_conn: db_conn} do
    stream = Client.stream(client, shape, live: true)

    with_consumer stream do
      # Wait for initial snapshot
      assert_insert(consumer, %{"id" => "child-1"})
      assert_up_to_date(consumer)

      # Deactivate the parent - this should trigger a move-out
      Postgrex.query!(db_conn, "UPDATE parent SET active = false WHERE id = 'parent-1'", [])

      # Should receive a synthetic delete for child-1
      assert_delete(consumer, %{"id" => "child-1"})
    end
  end

Benefits of this approach:

  • Readable as specifications: Tests describe expected behavior in plain terms that humans can verify
  • Oracle for AI development: Tests serve as ground truth that AI tools can use to drive implementation and verify correctness
  • Easier review: Reviewers can easily understand that the system is doing what it's supposed to at a high level without diving into implementation details
  • Confidence: Tests exercise the actual code paths through the full system, not just isolated units

Changes

Area Changes
electric/client/stream.ex Move-out processing, tag index tracking, synthetic delete generation
electric/client/message.ex Tag and event message parsing
Integration tests 8 end-to-end scenarios covering move-outs, tag updates, resume, etc.
StreamConsumer helper Expressive test DSL with assert_insert, assert_delete, assert_up_to_date

Test Plan

  • Unit tests for message parsing and tag operations
  • Integration tests for move-out scenarios
  • Resume behavior after move-out
  • Edge case: changing parent doesn't generate spurious deletes
  • Backward compatibility with untagged shapes

Additional changes separated from the main feature:

Bug Fixes (candidates for backporting to main)

  1. Fix empty error array handling (stream.ex)
    - Renamed unwrap → unwrap_error and added handling for empty error arrays from the server
    - Previously [] would be returned as the error message; now returns "Unknown error"
  2. Fix resume to use live requests for long-polling (stream.ex)
    - When resuming a stream, the client was making non-live requests because up_to_date? defaulted to false
    - Since a ResumeMessage is only created when already up-to-date, resuming should set up_to_date?: true so requests use long-polling

Infrastructure (supporting integration tests)

  1. Increase Postgres max_connections
    - Set max_connections = 200 in dev configs and CI to support integration tests with multiple concurrent connections
  2. Reduce default connection pool size
    - Changed default pool_size from 2 to 1 in test setup
    - Tests needing more connections use explicit @tag connection_opt_overrides: [pool_size: 2]
  3. Guard Ecto-dependent code
    - Wrapped ArrayDecoder module in Code.ensure_loaded?(Ecto) to prevent compile errors when Ecto is unavailable

Note: The commit history has been carefully organized to separate bug fixes, infrastructure changes, and the core feature implementation (by claude!). The two bug fixes in commits 1-2 are independent improvements discovered during development that could be backported to main.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added move-out support for subqueries with automatic synthetic delete generation when dependencies become inactive
    • Enhanced message headers with tag tracking and removal support for improved message filtering
    • Improved stream state management for dependency tracking and tag-based message organization
  • Tests

    • Added comprehensive integration tests for streaming and move-out scenarios

✏️ Tip: You can customize this high-level summary in your review settings.

@netlify
Copy link

netlify bot commented Jan 13, 2026

Deploy Preview for electric-next ready!

Name Link
🔨 Latest commit 0631be3
🔍 Latest deploy log https://app.netlify.com/projects/electric-next/deploys/6968eccce634fc00080de71b
😎 Deploy Preview https://deploy-preview-3699--electric-next.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Jan 13, 2026

Codecov Report

❌ Patch coverage is 95.32710% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.15%. Comparing base (c25526c) to head (0631be3).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
...ckages/elixir-client/lib/electric/client/stream.ex 92.95% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3699      +/-   ##
==========================================
- Coverage   87.36%   84.15%   -3.22%     
==========================================
  Files          23       44      +21     
  Lines        2011     2834     +823     
  Branches      535      533       -2     
==========================================
+ Hits         1757     2385     +628     
- Misses        252      447     +195     
  Partials        2        2              
Flag Coverage Δ
elixir 76.30% <95.32%> (?)
elixir-client 76.30% <95.32%> (?)
packages/experimental 87.73% <ø> (ø)
packages/react-hooks 86.48% <ø> (ø)
packages/start 82.83% <ø> (ø)
packages/typescript-client 93.47% <ø> (ø)
packages/y-electric 56.05% <ø> (ø)
typescript 87.36% <ø> (ø)
unit-tests 84.15% <95.32%> (-3.22%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@robacourt robacourt marked this pull request as ready for review January 13, 2026 18:05
@robacourt robacourt force-pushed the rob/elixir-client-move-outs branch from 861bce7 to fe74421 Compare January 14, 2026 14:35
@coderabbitai
Copy link

coderabbitai bot commented Jan 14, 2026

📝 Walkthrough

Walkthrough

Adds move-out support for subqueries in the Elixir client, introducing tagged rows, move-out event handling, synthetic delete generation when dependencies become inactive, and tag-based row tracking. Includes new message types, stream state management, integration tests, and testing infrastructure.

Changes

Cohort / File(s) Summary
Core Message Protocol
packages/elixir-client/lib/electric/client/message.ex
Introduces new MoveOutMessage struct with patterns, handle, and request_timestamp fields. Extends Headers with tags and removed_tags fields. Extends ResumeMessage with tag_to_keys and key_data for tag state preservation. Adds request_timestamp propagation through all parse clauses and message types.
Stream State & Processing
packages/elixir-client/lib/electric/client/stream.ex
Adds tag_to_keys and key_data struct fields for move-out tracking. Implements move-out message handling with synthetic delete generation. Adds tag index utilities (update_tag_index, remove_key_from_tags, etc.) to maintain tag state. Integrates synthetic deletes into message processing pipeline. Updates reset logic to clear tag-related state.
Ecto Adapter Guard
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex
Wraps ArrayDecoder module declaration in conditional guard (if Code.ensure_loaded?(Ecto)), making module existence conditional when Ecto is not loaded.
Message Parsing Tests
packages/elixir-client/test/electric/client/message_test.exs
Adds comprehensive tests for new message types: MoveOutMessage parsing with atom and string keys, multiple patterns. Extends Headers tests with tags and removed_tags handling. Updates ControlMessage tests to pass request_timestamp. Expands ChangeMessage tests with tag and old_value scenarios.
Client Stream Tests
packages/elixir-client/test/electric/client_test.exs
Adds extensive move-out handling test suite covering tag indexing, synthetic deletes, deduplication, multi-tag edge cases, resume behavior, and state inspection. Increases max_retries from 10 to 15 in fetch configurations. Adds drain_stream helper.
Integration & Streaming Tests
packages/sync-service/test/integration/streaming_test.exs, packages/sync-service/test/integration/subquery_move_out_test.exs
Adds HTTP-based integration tests for streaming with and without live updates. Introduces comprehensive subquery move-out test suite covering tag propagation, move-out events, synthetic deletes, move-in semantics, and resume state preservation. Includes with_parent_child_tables helper for parent/child relationship testing.
Test Support Infrastructure
packages/sync-service/test/support/stream_consumer.ex, packages/sync-service/test/support/integration_setup.ex
New StreamConsumer module providing start/stop lifecycle, message assertion macros (assert_insert, assert_update, assert_delete, assert_up_to_date), timeout handling, and pattern matching for testing streams. New IntegrationSetup module with with_electric_client for HTTP server startup and Electric.Client initialization.
Configuration & Setup
.changeset/beige-ladybugs-give.md, packages/sync-service/mix.exs, packages/sync-service/test/support/db_setup.ex, packages/sync-service/test/test_helper.exs, .github/workflows/sync_service_tests.yml, packages/sync-service/dev/postgres*.conf
Adds version bump changeset for elixir-client and sync-service. Adds electric_client test dependency to mix.exs. Reduces db_setup pool_size from 2 to 1. Adds Electric.Client.Application startup in test_helper to avoid circular dependency. Increases Postgres max_connections to 200 in CI and dev configs.
Test Configuration Tags
packages/sync-service/test/electric/postgres/configuration_test.exs, packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs
Adds ExUnit tag @tag connection_opt_overrides: [pool_size: 2] to two tests requiring isolated connection pool configuration.

Sequence Diagrams

sequenceDiagram
    participant Server as Electric Server
    participant Parser as Message.parse
    participant Headers as Headers
    participant MoveOut as MoveOutMessage
    participant Stream as Electric.Client.Stream
    participant Index as Tag Index

    Server->>Parser: Move-out event (patterns)
    Parser->>MoveOut: from_message/2
    MoveOut->>MoveOut: normalize_patterns
    MoveOut-->>Stream: MoveOutMessage
    Stream->>Index: find rows matching patterns
    Index-->>Stream: rows to delete
    Stream->>Stream: generate_synthetic_deletes
    Stream-->>Stream: buffer ChangeMessages (delete ops)
Loading
sequenceDiagram
    participant Server as Electric Server
    participant Parser as Message.parse
    participant CM as ChangeMessage
    participant Headers as Headers
    participant Stream as Electric.Client.Stream
    participant TagIndex as Tag Index Utilities

    Server->>Parser: Change event with tags/removed_tags
    Parser->>CM: from_message (with request_timestamp)
    Parser->>Headers: from_message (extracts tags)
    CM-->>Stream: ChangeMessage {operation, tags, removed_tags}
    Stream->>TagIndex: update_tag_index
    TagIndex->>TagIndex: add_key_to_tags / remove_key_from_tags
    TagIndex-->>Stream: updated tag_to_keys, key_data
    Stream-->>Stream: store for resume state
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 Whisker-twitching joy! 🎉
Subqueries now dance with tags so bright,
Move-outs handled, synthetic deletes take flight,
Stream state tracked with care and might—
The Elixir client shines ever more light!

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 44.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main feature being added: move-in/out support for subqueries in the Elixir client, which aligns with the comprehensive changes throughout the codebase.
Linked Issues check ✅ Passed The PR fully implements the requirements from #3672: adds tag parsing to message headers, creates message types for move-out events, implements TagIndex and MoveState modules for tracking row tags, processes move-out events with synthetic deletes, buffers moves during sync, and includes comprehensive unit and integration tests.
Out of Scope Changes check ✅ Passed All changes are scoped to implementing move-in/out support. Infrastructure changes (Postgres max_connections, pool_size adjustments, test setup) and Ecto-guarded code are supportive of the main feature and not out-of-scope additions.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex`:
- Around line 64-75: The decode_quoted_elem function doesn't handle escaped
backslashes, so pairs of backslashes in quoted PostgreSQL array elements are
decoded as two backslashes; add a specific clause in decode_quoted_elem to match
an escaped backslash (e.g., pattern matching "\\" at the head) that pushes a
single backslash into the accumulator and recurses, and insert this clause
before the generic utf8 clause (the existing defp decode_quoted_elem(<<c::utf8,
rest::bitstring>>, acc, state)). Update tests to include a quoted element like
{"a\\b"} that asserts the result is ["a\b"].

In `@packages/elixir-client/lib/electric/client/message.ex`:
- Around line 208-215: The struct field request_timestamp is declared as
DateTime.t() but from_message/2 never sets it, leaving nil and violating the
typespec; update the from_message/2 function(s) to populate request_timestamp
with a DateTime (e.g., DateTime.utc_now()) when building the %__MODULE__{} or,
alternatively, change the t type spec to DateTime.t() | nil if it may
legitimately be unset; modify the clause(s) of from_message/2 that construct the
struct and the `@type` t accordingly so the implementation and spec remain
consistent.
- Around line 217-234: The two from_message/2 clauses produce inconsistent
pattern shapes: the string-key clause maps patterns into atom-keyed maps
(parsed_patterns) while the atom-key clause forwards patterns unchanged; update
the atom-key clause (the def from_message(%{headers: %{event: "move-out",
patterns: patterns}}, handle) clause) to normalize patterns the same way as the
string-key clause (e.g., map each pattern to %{pos: pos, value: value}) or
delegate both clauses to a shared helper (normalize_patterns/1) before building
the %__MODULE__{patterns: ..., handle: handle} to ensure consistent pattern
structure regardless of input key style.
- Around line 185-193: The key_data type currently uses msg: any(), which
weakens type safety; change it to msg: ChangeMessage.t() (or the correct
fully-qualified ChangeMessage module if namespaced) in the `@type` t for the
struct and update any necessary alias or import (e.g., alias
Electric.ChangeMessage or add the module prefix) so the spec compiles; keep the
defstruct unchanged but make sure the ChangeMessage type exists or is
defined/aliased in the same module to satisfy Dialyzer and compile-time checks.

In `@packages/sync-service/test/integration/subquery_move_out_test.exs`:
- Around line 351-352: Replace the brittle Process.sleep(100) with a
deterministic wait: poll the observable condition the server should reach (e.g.,
query the DB or call the public API/function that reflects the change) in a
short loop with a timeout and backoff, or use assert_receive if the code under
test can send a message; implement a helper like wait_until/2 that repeatedly
checks the condition and raises on timeout, and call that helper instead of
Process.sleep(100) so the test waits only as long as needed and is resilient on
CI.
- Around line 186-218: The assertion in the test method that checks tag updates
uses a compound condition that may be hard to debug if it fails. Modify the
assertion in the test "update that changes parent reference updates tags" inside
the block consuming the stream to include a descriptive failure message
indicating the expected tag change or removal, specifying the initial tags, new
tags, and removed tags values. This will clarify test failures related to tag
updates.

In `@packages/sync-service/test/test_helper.exs`:
- Around line 10-14: The direct call to Electric.Client.Application.start/2 is
pattern-matched to {:ok, _} and will crash if the app is already started; change
the invocation to handle both {:ok, _} and {:error, {:already_started, _}}
(e.g., use a case or with to accept either result and ignore the already-started
error) so tests don’t fail when the application is already running; update the
call site Electric.Client.Application.start(:normal, []) accordingly and
optionally log or comment that the already-started case is acceptable.
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed6e304 and fe74421.

📒 Files selected for processing (16)
  • .changeset/beige-ladybugs-give.md
  • packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex
  • packages/elixir-client/lib/electric/client/message.ex
  • packages/elixir-client/lib/electric/client/stream.ex
  • packages/elixir-client/test/electric/client/message_test.exs
  • packages/elixir-client/test/electric/client_test.exs
  • packages/sync-service/mix.exs
  • packages/sync-service/test/electric/postgres/configuration_test.exs
  • packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs
  • packages/sync-service/test/electric/shapes/partitioned_tables_test.exs
  • packages/sync-service/test/integration/streaming_test.exs
  • packages/sync-service/test/integration/subquery_move_out_test.exs
  • packages/sync-service/test/support/db_setup.ex
  • packages/sync-service/test/support/integration_setup.ex
  • packages/sync-service/test/support/stream_consumer.ex
  • packages/sync-service/test/test_helper.exs
🧰 Additional context used
🧬 Code graph analysis (7)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)
packages/elixir-client/lib/electric/client/fetch/response.ex (1)
  • decode! (28-34)
packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
  • cast_to (186-193)
  • cast_to (195-226)
  • cast_to (228-233)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
  • build_router_opts (487-506)
packages/sync-service/test/integration/subquery_move_out_test.exs (1)
packages/sync-service/test/support/stream_consumer.ex (5)
  • assert_insert (56-70)
  • assert_up_to_date (124-135)
  • assert_delete (105-119)
  • await_count (153-158)
  • assert_resume (140-143)
packages/elixir-client/test/electric/client/message_test.exs (1)
packages/elixir-client/lib/electric/client/message.ex (6)
  • parse (239-241)
  • parse (243-245)
  • parse (247-249)
  • parse (251-253)
  • parse (255-257)
  • parse (259-261)
packages/sync-service/test/test_helper.exs (2)
packages/sync-service/lib/electric/application.ex (1)
  • start (6-20)
packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex (1)
  • start (28-31)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
  • assert_insert (56-70)
  • assert_up_to_date (124-135)
packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
  • query! (90-105)
packages/elixir-client/lib/electric/client/message.ex (1)
packages/elixir-client/lib/electric/client/stream.ex (1)
  • shape_handle (346-348)
🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md

6-6: First line in a file should be a top-level heading

(MD041, first-line-heading, first-line-h1)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Test packages/experimental w/ sync-service
  • GitHub Check: Test packages/react-hooks w/ sync-service
  • GitHub Check: Test packages/y-electric w/ sync-service
  • GitHub Check: Test packages/start w/ sync-service
  • GitHub Check: Run Lux integration tests
  • GitHub Check: Build and test sync-service, pg15
  • GitHub Check: Build and test sync-service, pg18
  • GitHub Check: Build and test sync-service, pg14
  • GitHub Check: Build and test sync-service, pg17
🔇 Additional comments (41)
packages/sync-service/test/support/db_setup.ex (1)

4-9: LGTM — pool_size reduction is reasonable for test defaults.

Reducing the default pool_size from 2 to 1 is sensible for minimizing resource usage and avoiding connection contention in tests. Individual tests that require concurrent database access can override this via connection_opt_overrides in the test context.

packages/sync-service/test/electric/shapes/partitioned_tables_test.exs (1)

2-2: Clarify the reason for disabling async test execution.

Switching from async: true to async: false will serialize these tests, which impacts overall test suite performance. Given that the setup already uses with_unique_db for database isolation, the contention point isn't immediately obvious.

Consider adding a brief comment explaining why async execution was disabled (e.g., registry conflicts, shared state with new integration tests, or observed flakiness). This helps future maintainers understand the constraint and avoid inadvertently re-enabling async.

packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs (1)

16-17: LGTM!

The pool_size: 2 override is appropriate here. The test requires concurrent database connections: one for the Task holding the advisory lock (line 28) and another for the LockBreakerConnection to terminate it. This ensures the test has sufficient connections to avoid pool contention.

packages/sync-service/test/electric/postgres/configuration_test.exs (1)

107-108: LGTM!

The pool_size: 2 override correctly addresses the concurrent connection requirement for this test. One connection is held by the Task acquiring the table lock (line 128), while another is needed for the add_table_to_publication call (line 144) that's expected to timeout. This follows the established pattern already used in this file (line 315).

packages/sync-service/mix.exs (1)

122-123: LGTM!

The test-only dependency on electric_client is correctly configured with runtime: false to prevent automatic OTP application startup (which is handled explicitly in test_helper.exs). The relative path reference is appropriate for monorepo structure.

packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)

1-2: LGTM!

Wrapping the module in Code.ensure_loaded?(Ecto) is the correct pattern for making this functionality optional when Ecto is not a dependency.


79-92: LGTM!

The distinction between cast and cast_quoted correctly handles NULL semantics — in PostgreSQL array format, NULL is always unquoted, so quoted elements can skip the NULL check.

packages/sync-service/test/support/stream_consumer.ex (4)

23-43: LGTM!

Clean implementation of the stream consumer with proper task lifecycle management. The message forwarding pattern is appropriate for test assertions.


189-208: Strict assertion semantics may cause flaky tests.

do_assert_receive_message (used by assert_insert, assert_update, assert_delete) flunks immediately on any non-matching message. This means if a ControlMessage (like :up_to_date) arrives before an expected ChangeMessage, the test fails.

This differs from do_assert_receive_matching (used by assert_resume) which skips non-matching messages.

If this strict ordering is intentional for enforcement, consider adding a note in the docstrings. Otherwise, consider using the lenient approach for change assertions too.


293-303: LGTM!

The with_consumer macro provides clean ergonomics with proper resource cleanup via try/after. Using var!(consumer) to inject the binding is appropriate for this DSL pattern.


256-277: LGTM!

The collect_messages utility with a short default timeout (100ms) is useful for "collect what's available" scenarios without blocking tests for too long.

.changeset/beige-ladybugs-give.md (1)

1-6: LGTM!

The changeset correctly specifies patch bumps for both affected packages. The MD041 linting warning is a false positive — changeset files require YAML frontmatter as the first content, not a markdown heading.

packages/elixir-client/test/electric/client/message_test.exs (3)

33-76: Good test coverage for MoveOutMessage parsing.

The tests cover essential parsing scenarios including string/atom key variants and multiple patterns. However, consider adding a negative test case for malformed move-out messages (e.g., missing patterns field) to ensure graceful error handling.


78-124: Solid test coverage for tags and removed_tags in headers.

Tests cover the key scenarios: tags only, removed_tags only, both present, and defaults to empty lists. This ensures backward compatibility with untagged shapes.


126-167: Good ChangeMessage parsing tests with comprehensive assertions.

The tests verify key, value, headers, old_value, and handle propagation. Coverage for insert/update/delete operations is complete.

packages/elixir-client/test/electric/client_test.exs (7)

38-47: Well-designed helper for testing internal stream state.

The recursive drain_stream/1 function properly drains the stream and returns the final state for inspection. This is essential for verifying tag index state after processing.


1378-1429: Good backward compatibility test for untagged shapes.

This test correctly verifies that the tag index remains empty when messages don't include tags, ensuring backward compatibility with existing shapes that don't use subquery-based membership.


1431-1492: Core move-out test validates the happy path correctly.

This test verifies the fundamental move-out flow: insert with tag → move-out event → synthetic delete with the original row data. The assertion that the delete contains the full value ("name" => "test") is important for downstream consumers.


1777-1860: Test documents edge case for duplicate tag entries.

This test validates that updates with the same tag but without removed_tags don't cause duplicate synthetic deletes. The assertion on Line 1858 expects exactly 1 delete, which aligns with the implementation in stream.ex that uses key_data to track per-key state and avoid duplicates.


1862-1926: Critical edge case: partial move-out with multiple tags.

This test verifies that a row with multiple tags (["tag-a", "tag-b"]) is NOT deleted when only one tag is moved out. The implementation in generate_synthetic_deletes/2 correctly checks if remaining_tags is empty before adding to keys_to_delete.


2169-2253: Resume test validates state persistence across stream restarts.

This test is crucial for ensuring move-out functionality works correctly when a client resumes from a checkpoint. The ResumeMessage now includes tag_to_keys and key_data fields (Lines 260-261 in stream.ex), which are restored on resume (Lines 387-395 in stream.ex).


2075-2167: Test documents important edge case for tag removal.

This test validates that when a row is updated to remove ALL its tags (via removed_tags without new tags), subsequent move-out events are no-ops. The implementation handles this in update_tag_index (Lines 451-461 in stream.ex) by clearing the key from both data structures when updated_tags is empty.

packages/sync-service/test/integration/streaming_test.exs (2)

1-70: Solid integration test structure with good coverage of streaming basics.

The test module properly:

  • Uses async: false for integration tests that share resources
  • Tags tests with @tmp_dir for isolated temp directories
  • Uses the expressive with_consumer DSL for assertions
  • Tests both snapshot and live streaming modes

The test setup chain is well-organized and reusable.


43-61: Potential race condition in live changes test.

The test inserts a row via Postgrex.query! and immediately asserts on the stream. While this likely works due to Electric's replication mechanism, consider adding a brief assertion or using a more robust synchronization mechanism if this test becomes flaky.

packages/elixir-client/lib/electric/client/stream.ex (7)

22-27: Clear documentation for new move-out data structures.

The inline comments explain the purpose of tag_to_keys and key_data well. These structures enable efficient tag lookup and deduplication.


233-249: Move-out handling with clear assumptions documented.

The comment on Lines 234-235 documents the assumption that move-out events only occur after the initial snapshot. This is important context for understanding when synthetic deletes are generated.

The implementation correctly:

  • Generates synthetic deletes for matching patterns
  • Updates both tag_to_keys and key_data atomically
  • Buffers synthetic deletes in order

423-480: Comprehensive tag index update logic with proper edge case handling.

The implementation correctly handles:

  • Initial insert (no current_data)
  • Updates with added/removed tags
  • Delete operations (full cleanup)
  • Updates that result in zero tags (cleanup)

The use of MapSet operations ensures no duplicate tags per key.


511-574: Two-pass algorithm correctly handles multi-tag rows.

The algorithm properly:

  1. First pass: collects all matching keys and tracks which tags are being removed
  2. Second pass: checks remaining tags before deciding to delete

This ensures a row with tags ["tag-a", "tag-b"] won't be deleted when only tag-a is moved out.


385-402: Resume logic uses Map.get with defaults for backward compatibility.

Using Map.get(resume, :tag_to_keys, %{}) ensures backward compatibility with ResumeMessage structs that don't have these new fields (e.g., from persisted state before this change).


267-270: Error unwrapping handles various error payload formats.

The unwrap_error/1 function handles empty lists, single-element lists, multi-element lists, and non-list values. This provides flexibility for different error formats from the server.


514-515: Assumption about move-out pattern matching is accurate.

The current implementation only matches patterns using the value field and completely ignores the pos field, confirming the comment's statement. Across all test cases and the pattern-matching logic in generate_synthetic_deletes, no positional matching is performed. The pos field is transmitted in the protocol and parsed but never used, suggesting it was included for future extensibility if composite tag support becomes necessary.

packages/sync-service/test/support/integration_setup.ex (1)

17-41: Well-structured integration test helper.

The helper properly:

  • Waits for the Electric stack to be active before starting the server
  • Uses port 0 for dynamic allocation, avoiding port conflicts
  • Uses start_supervised for automatic cleanup
  • Returns a comprehensive context map for test use

The 2000ms timeout is appropriately balanced for this helper's complexity and aligns with timeout patterns used across the test suite.

packages/sync-service/test/integration/subquery_move_out_test.exs (7)

1-26: LGTM - Well-structured test module setup.

The module definition, imports, and shape definition are clear and well-organized. The @subquery_where module attribute provides good reusability across tests.


42-49: LGTM - Tag presence verification test.

Correctly verifies that change messages for subquery-matched rows include tags in headers.


55-73: LGTM - Core move-out test.

This test properly validates the end-to-end flow: initial snapshot, parent deactivation, and synthetic delete generation.


80-107: LGTM - Multi-child synthetic delete test.

Good use of await_count to collect multiple deletes without relying on order, then sorting for comparison.


133-167: LGTM - Move-in after parent change test.

This test validates an important edge case where a child row moves between parents, covering both move-out and move-in flows.


365-385: LGTM - Clean helper function.

The helper properly creates the parent/child table structure needed for subquery tests with appropriate constraints.


258-269: collect_messages/2 helper exists in StreamConsumer at packages/sync-service/test/support/stream_consumer.ex:256.

packages/elixir-client/lib/electric/client/message.ex (2)

8-48: LGTM - Headers struct extended correctly.

The tags and removed_tags fields are properly added with sensible defaults and correct type definitions. The from_message/2 function correctly extracts these fields with fallbacks to empty lists.


251-257: LGTM - Parse clauses for move-out events.

The parse clauses correctly match both string-key and atom-key formats and delegate to MoveOutMessage.from_message/2, consistent with the pattern used for ControlMessage.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/stream.ex`:
- Around line 562-575: Synthetic delete messages are using DateTime.utc_now()
instead of the move-out/request timestamp; change the synthetic_deletes
construction so the Message.ChangeMessage.request_timestamp is set to the
move-out/request timestamp (propagate the move-out event's request_timestamp
through keys_to_delete rather than calling DateTime.utc_now()), and update the
caller in handle_msg to include/pass that timestamp into keys_to_delete so
synthetic_deletes can use it (refer to synthetic_deletes, keys_to_delete,
Message.ChangeMessage.request_timestamp, and handle_msg).
- Around line 237-253: handle_msg/2 currently processes Message.MoveOutMessage
without verifying the snapshot is complete; add a defensive check at the top of
the Message.MoveOutMessage clause to ensure stream.up_to_date? (or the
equivalent boolean on the stream) is true before calling
generate_synthetic_deletes, and if it's false do not apply deletes: instead log
or emit a protocol error and return {:cont, stream} (or otherwise ignore/buffer
the message) to avoid deleting rows that were never observed. Ensure you
reference the existing symbols handle_msg, Message.MoveOutMessage,
stream.up_to_date?, and generate_synthetic_deletes when making the change.
♻️ Duplicate comments (3)
packages/elixir-client/test/electric/client/message_test.exs (1)

73-78: Use pattern matching for clearer assertions.

Using Enum.at with separate assertions is less idiomatic. Pattern matching directly on the list makes the test more readable and concise.

♻️ Suggested refactor
-      assert [%MoveOutMessage{patterns: patterns}] = Message.parse(msg, "handle", & &1, `@ts`)
-      assert length(patterns) == 3
-      assert Enum.at(patterns, 0) == %{pos: 0, value: "tag-1"}
-      assert Enum.at(patterns, 1) == %{pos: 1, value: "tag-2"}
-      assert Enum.at(patterns, 2) == %{pos: 2, value: "tag-3"}
+      assert [%MoveOutMessage{patterns: patterns}] = Message.parse(msg, "handle", & &1, `@ts`)
+      assert [
+               %{pos: 0, value: "tag-1"},
+               %{pos: 1, value: "tag-2"},
+               %{pos: 2, value: "tag-3"}
+             ] = patterns
packages/elixir-client/lib/electric/client/message.ex (2)

190-198: Consider using a more specific type for msg in key_data.

The msg: any() type reduces type safety. Since this always stores a ChangeMessage, the type should reflect that.

-            key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: any()}}
+            key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}}

222-249: Inconsistent pattern normalization between from_message/3 clauses.

The string-key version (lines 222-237) transforms patterns by extracting "pos" and "value" into atom-keyed maps, but the atom-key version (lines 239-249) passes patterns through unchanged. This can cause inconsistent pattern structures depending on input format.

🐛 Proposed fix to normalize patterns consistently
   def from_message(
         %{headers: %{event: "move-out", patterns: patterns}},
         handle,
         request_timestamp
       ) do
+    # Normalize patterns to ensure consistent structure
+    parsed_patterns =
+      Enum.map(patterns, fn
+        %{pos: pos, value: value} -> %{pos: pos, value: value}
+        %{"pos" => pos, "value" => value} -> %{pos: pos, value: value}
+      end)
+
     %__MODULE__{
-      patterns: patterns,
+      patterns: parsed_patterns,
       handle: handle,
       request_timestamp: request_timestamp
     }
   end
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fe74421 and a5bdd58.

📒 Files selected for processing (3)
  • packages/elixir-client/lib/electric/client/message.ex
  • packages/elixir-client/lib/electric/client/stream.ex
  • packages/elixir-client/test/electric/client/message_test.exs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Build and test sync-service, pg14
  • GitHub Check: Build and test sync-service, pg15
  • GitHub Check: Build and test sync-service, pg17
  • GitHub Check: Build and test sync-service, pg18
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (13)
packages/elixir-client/test/electric/client/message_test.exs (4)

1-9: LGTM!

Module setup with aliases and test timestamp constant is well-structured.


10-34: LGTM!

ControlMessage tests cover all three control types (up-to-date, must-refetch, snapshot-end) with both string and atom key variants, ensuring robust parsing.


81-127: LGTM!

Good coverage for tag-related header parsing including edge cases: tags only, removed_tags only, both present, and defaults when missing.


129-170: LGTM!

ChangeMessage tests appropriately validate parsing of insert, update with old_value, and delete operations, with proper header assertions.

packages/elixir-client/lib/electric/client/stream.ex (5)

22-27: LGTM!

Clear documentation of the new state fields for move-out tracking. The data structure design with tag_to_keys for reverse lookups and key_data for per-key state is well thought out.


260-266: LGTM!

ResumeMessage correctly includes tag tracking state, enabling proper resume behavior for shapes with move-out support.


335-338: LGTM!

Reset correctly clears all tag tracking state alongside other stream state.


389-406: LGTM!

Resume properly restores tag state from the ResumeMessage with safe defaults for backward compatibility with older resume data.


427-484: LGTM!

Tag index update logic correctly handles all cases: inserts (add tags), updates (add/remove tags), and deletes (remove key entirely). The use of MapSet for efficient set operations is appropriate.

packages/elixir-client/lib/electric/client/message.ex (4)

7-58: LGTM!

Headers struct properly extended with tags and removed_tags fields, with appropriate type definitions and defaults.


60-112: LGTM!

ControlMessage correctly propagates request_timestamp through both string-key and atom-key parsing paths.


114-166: LGTM!

ChangeMessage parsing properly includes request_timestamp propagation.


254-291: LGTM!

Parse function dispatch correctly routes to appropriate message constructors for all message types: ChangeMessage, ControlMessage, MoveOutMessage, and empty strings.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/elixir-client/lib/electric/client/stream.ex (1)

96-108: Type spec is missing the new tag_to_keys and key_data fields.

The @type t specification doesn't include the newly added fields, which reduces type safety and IDE support.

🔧 Suggested fix
   `@type` t :: %__MODULE__{
           client: Client.t(),
           schema: Client.schema(),
           value_mapper_fun: Client.ValueMapper.mapper_fun(),
           parser: nil | {module(), term()},
           buffer: :queue.queue(),
           up_to_date?: boolean(),
           offset: Client.offset(),
           replica: Client.replica(),
           shape_handle: nil | Client.shape_handle(),
           state: :init | :stream | :done,
-          opts: opts()
+          opts: opts(),
+          tag_to_keys: %{optional(String.t()) => MapSet.t(String.t())},
+          key_data: %{optional(String.t()) => %{tags: MapSet.t(String.t()), msg: Message.ChangeMessage.t()}}
         }
🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/stream.ex`:
- Around line 489-516: The Enum.reduce calls unnecessarily call MapSet.to_list,
causing allocations; update remove_key_from_tags and add_key_to_tags to pass the
MapSet `tags` directly to Enum.reduce (i.e., Enum.reduce(tags, ...)) and remove
the .to_list() call, and apply the same change to the Enum.reduce usages inside
update_tag_index so all reductions iterate the MapSet directly.
♻️ Duplicate comments (1)
packages/elixir-client/lib/electric/client/stream.ex (1)

237-256: Move-out handling lacks defensive validation for snapshot completion.

The comment on lines 241-242 states an assumption that move-out events only arrive after the snapshot is complete, but the code doesn't validate this. If the server sends a move-out during snapshot sync (protocol violation or bug), this could delete rows that were never observed by the client.

Consider adding a defensive check or at minimum logging when this invariant is violated.

📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f33069 and 463b703.

📒 Files selected for processing (1)
  • packages/elixir-client/lib/electric/client/stream.ex
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Test packages/react-hooks w/ sync-service
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Test packages/experimental w/ sync-service
  • GitHub Check: Test packages/start w/ sync-service
  • GitHub Check: Test packages/y-electric w/ sync-service
  • GitHub Check: Build and test sync-service, pg14
  • GitHub Check: Build and test sync-service, pg17
  • GitHub Check: Build and test sync-service, pg18
  • GitHub Check: Build and test sync-service, pg15
  • GitHub Check: Run Lux integration tests
  • GitHub Check: Build and test elixir-client
🔇 Additional comments (8)
packages/elixir-client/lib/electric/client/stream.ex (8)

188-192: LGTM!

Correctly propagates request_timestamp through the message parsing chain, enabling synthetic deletes to use the original timestamp.


232-235: LGTM!

The tag index is correctly updated before buffering the message, ensuring consistent state tracking.


262-272: LGTM!

Tag state is correctly persisted in the ResumeMessage, enabling proper state restoration on stream resume.


274-277: LGTM!

The unwrap_error function properly normalizes various error payload shapes with sensible defaults.


330-342: LGTM!

Tag state is correctly cleared on reset, ensuring a clean slate when the shape handle changes.


392-410: LGTM!

The use of Map.get with default empty maps ensures backward compatibility with ResumeMessage structs that don't have tag state fields.


430-487: LGTM!

The tag index update logic correctly handles all operation types:

  • Deletes properly clean up all tracking state
  • Updates efficiently compute tag differences using MapSet.difference and MapSet.union
  • Empty tag sets trigger cleanup to prevent memory leaks

564-578: The implementation of synthetic deletes with old_value: nil appears intentional and is consistent with how the codebase handles delete messages. The delete message test does not assert any expectation for old_value, and no code in the repository accesses old_value in the context of delete operations. Synthetic deletes are client-side tag-based removals, distinct from server-initiated deletes, and setting old_value to nil is the current design.

Likely an incorrect or invalid review comment.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@packages/elixir-client/test/electric/client_test.exs`:
- Around line 1777-1860: Rename the test whose description string is "update
without removed_tags causes duplicate synthetic deletes" to a name that reflects
the expected (fixed) behavior, e.g. "update without removed_tags should still
generate single synthetic delete" (or "deduplicates synthetic deletes when same
tag appears in multiple updates"), and update any top-line comment text that
claims a BUG to instead state the expected behavior so the test name and
comments align with the assertion and intent.
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3f90c79 and bddf44c.

📒 Files selected for processing (1)
  • packages/elixir-client/test/electric/client_test.exs
🧰 Additional context used
🧬 Code graph analysis (1)
packages/elixir-client/test/electric/client_test.exs (4)
packages/elixir-client/lib/electric/client/stream.ex (1)
  • next (144-149)
packages/elixir-client/test/electric/client/embedded_test.exs (1)
  • stream (11-13)
packages/elixir-client/lib/electric/client.ex (3)
  • stream (493-495)
  • stream (503-505)
  • stream (543-547)
packages/elixir-client/lib/electric/client/mock.ex (1)
  • up_to_date (157-160)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Test packages/y-electric w/ sync-service
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (12)
packages/elixir-client/test/electric/client_test.exs (12)

38-47: LGTM! Clean recursive helper for draining streams.

The helper correctly recurses until :halt and returns the final stream struct for state inspection.

Minor style note: the alias on line 41 could be moved to the module-level aliases (around line 7-10) for consistency, though keeping it local is also acceptable since this is a private test helper.


898-898: LGTM! Retry count appropriately increased to handle test scenario.

The test simulates multiple error conditions (initial 500, Bypass down/up cycle, 8 retry status codes) before success. Increasing to 15 retries provides sufficient headroom to prevent test flakiness.


1378-1429: LGTM! Good backward compatibility test.

This test verifies that shapes without tags don't pollute the tag index, which is essential for backward compatibility with existing shapes that don't use subquery-based move-out semantics.


1431-1492: LGTM! Core move-out functionality test.

Good coverage of the primary use case: insert with tag → move-out event → synthetic delete generation with correct value data.


1494-1560: LGTM! Multi-row move-out scenario.

Good test for verifying that a single move-out pattern correctly generates synthetic deletes for all matching rows.


1562-1622: LGTM! Non-matching pattern test.

Essential negative test verifying that move-out patterns only affect rows with matching tags.


1624-1699: LGTM! Tag transition test.

Important coverage for the tag replacement scenario where a row moves from one tag to another via removed_tags. Correctly verifies no synthetic delete occurs since the row remains in the shape via the new tag.


1701-1775: LGTM! Delete cleanup test.

Correctly verifies that explicit deletes clear the row from the tag index, preventing duplicate deletes when a subsequent move-out event arrives.


1862-1926: LGTM! Critical partial move-out test.

This tests an important semantic: rows with multiple tags should only be deleted when ALL their tags are moved out. The assertion correctly expects no synthetic deletes when the row still belongs via tag-b.


1928-2009: LGTM! Value freshness test.

Critical test ensuring synthetic deletes contain the most recent row value, not stale data from initial insert. This is important for downstream consumers that may need accurate final state.


2011-2073: LGTM! Pattern deduplication test.

Good edge case coverage: when a single move-out event contains multiple patterns that all match the same row, only one synthetic delete should be generated.


2075-2167: LGTM! Complete tag removal test.

Well-documented edge case: when an update removes all tags from a row (via removed_tags with no new tags), the row should be cleared from the tag index. Subsequent move-out events for the removed tag should be a no-op.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@packages/sync-service/test/integration/subquery_move_out_test.exs`:
- Around line 306-312: The current conditional allows the assertion to be
skipped if tags are nil; instead assert that tags are present and equal: fetch
tags1 and tags2 from msg1.headers and msg2.headers (Map.get(msg1.headers, :tags)
and Map.get(msg2.headers, :tags)), add a non-nil check like assert not
is_nil(tags1) (or assert tags1 != nil) and then assert tags1 == tags2 so the
test fails if tags are missing or mismatched.
♻️ Duplicate comments (1)
packages/sync-service/test/integration/subquery_move_out_test.exs (1)

214-217: Improve assertion message for better debuggability.

The compound condition is correct but could benefit from a descriptive failure message.

♻️ Suggested improvement
-        # Either we have explicit removed_tags, or tags have changed
-        assert new_tags != initial_tags or length(removed_tags) > 0
+        # Either we have explicit removed_tags, or tags have changed
+        assert new_tags != initial_tags or length(removed_tags) > 0,
+               "Expected tags to change or removed_tags to be present. " <>
+                 "initial_tags: #{inspect(initial_tags)}, new_tags: #{inspect(new_tags)}, " <>
+                 "removed_tags: #{inspect(removed_tags)}"
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bddf44c and bd28eae.

📒 Files selected for processing (1)
  • packages/sync-service/test/integration/subquery_move_out_test.exs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Test packages/experimental w/ sync-service
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (6)
packages/sync-service/test/integration/subquery_move_out_test.exs (6)

27-49: Well-structured test setup and initial test case.

The setup chain is correctly ordered, and the first test properly validates that tags are present on change messages for subquery-matched rows. The pattern %{headers: %{tags: [_]}} correctly asserts a non-empty tags list.


55-73: Good coverage of the core move-out scenario.

This test correctly validates the primary use case: deactivating a parent triggers a synthetic delete for the child. The test properly waits for up_to_date before making mutations, ensuring deterministic behavior.


75-107: Correct use of await_count for multiple deletes.

Using await_count with a match function is the right approach for collecting multiple synthetic deletes without assuming order. Sorting before comparison ensures deterministic assertions.


225-270: Excellent edge case test for tag index consistency.

This test validates critical behavior: when a child row switches parents, the tag index must be updated to prevent spurious deletes when the old parent is deactivated. The inline documentation clearly explains the scenario being tested.


331-358: Well-designed resume scenario test.

This test correctly simulates a client that syncs initial data, disconnects, has the underlying data change (parent deactivated), and then resumes. Using live: false to get a ResumeMessage and then resuming with it is the right approach to test resume semantics.


361-382: Clean helper setup for parent/child relationship testing.

The schema correctly models the subquery scenario with ON DELETE CASCADE maintaining referential integrity. Returning the tables list follows the established pattern for context propagation.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/message.ex`:
- Around line 246-251: The atom-key clause in normalize_patterns returns the
original map allowing extra keys, whereas the string-key clause constructs a new
map with only pos and value; update normalize_patterns to explicitly extract and
return only %{pos: pos, value: value} in both clauses (i.e., pattern-match the
atom-key clause like %{pos: pos, value: value} -> %{pos: pos, value: value}) so
both branches normalize to the same sanitized shape.
- Around line 254-293: The parse/4 function lacks a catch-all clause and will
raise FunctionClauseError on unrecognized message shapes; update the parse/4
clauses (the parse function handling maps and strings) to include a final
fallback clause that matches any input (e.g., parse(_, _shape_handle,
_value_mapper_fun, _request_timestamp)) and returns [] to mirror the
empty-string case, or alternatively add a `@doc` on the parse/4 function stating
the intentional fail-fast behavior if you prefer to keep it without a fallback;
reference the parse/4 function and the
ChangeMessage/ControlMessage/MoveOutMessage dispatch points when making the
change.
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bd28eae and a730d67.

📒 Files selected for processing (1)
  • packages/elixir-client/lib/electric/client/message.ex
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (4)
packages/elixir-client/lib/electric/client/message.ex (4)

7-58: LGTM!

The Headers module additions are well-structured. The new tags and removed_tags fields have appropriate default values, type definitions are accurate, and from_message/2 gracefully handles missing keys with Map.get/3 defaults.


60-112: LGTM!

The ControlMessage module correctly integrates request_timestamp into both the struct and type definition. Both from_message/3 clauses consistently handle string-key and atom-key message formats.


114-166: LGTM!

The ChangeMessage module correctly propagates request_timestamp through the struct. The from_message/4 signature change is appropriately reflected in the type definition.


168-199: LGTM!

The ResumeMessage module correctly adds move-state tracking fields with appropriate default values. The msg field in key_data now properly uses ChangeMessage.t() for type safety, addressing the previous review concern.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@electric-sql electric-sql deleted a comment from blacksmith-sh bot Jan 15, 2026
@robacourt robacourt force-pushed the rob/elixir-client-move-outs branch from b80ceb4 to c6fb79f Compare January 15, 2026 12:52
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In @.github/workflows/sync_service_tests.yml:
- Line 45: The workflow uses the unsupported "command" key on a GitHub Actions
service container (the line `command: postgres -c max_connections=200`), so
remove that key and instead set up Postgres with the desired max_connections via
one of the supported approaches: use a Docker image already built with the
Postgres CMD configured for `-c max_connections=200`, start Postgres in a run
step with `docker run` and the appropriate CLI flags, or run docker-compose in a
step to control entrypoint/command; delete the `command` entry from the service
definition and implement one of those alternatives to ensure
`max_connections=200` is applied.

In `@packages/sync-service/test/electric/shapes/partitioned_tables_test.exs`:
- Line 2: Add a brief comment above the "use ExUnit.Case, async: false" line in
partitioned_tables_test.exs explaining that async is disabled due to shared DB
state (publications, partitions) to prevent test interference; reference the
"use ExUnit.Case, async: false" line so maintainers know this is intentional and
not an oversight.

In `@packages/sync-service/test/support/integration_setup.ex`:
- Around line 17-41: The hardcoded 2000ms in with_electric_client is brittle;
make the wait timeout configurable by reading a :wait_timeout from the opts (or
an env var fallback) and pass that value to
Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: timeout). Update
with_electric_client to compute timeout = Keyword.get(opts, :wait_timeout,
String.to_integer(System.get_env("ELECTRIC_WAIT_TIMEOUT") || "2000")) (or
similar) and use that timeout when calling
Electric.StatusMonitor.wait_until_active so CI can override it.

In `@packages/sync-service/test/support/stream_consumer.ex`:
- Around line 56-119: The assertion helpers assert_insert, assert_update, and
assert_delete currently call assert_receive_message (which fails on the first
non-matching message) while assert_resume uses assert_receive_matching (which
skips non-matching messages), causing inconsistent and surprising test behavior;
make the behavior consistent by switching assert_insert, assert_update, and
assert_delete to use assert_receive_matching (or another skipping helper used by
assert_resume) and update their docstrings to note that these assertions will
skip non-matching messages, referencing the functions assert_insert,
assert_update, assert_delete and assert_resume so reviewers can locate and
verify the change.
♻️ Duplicate comments (1)
packages/elixir-client/lib/electric/client/message.ex (1)

201-252: LGTM!

The MoveOutMessage module is well-documented and properly structured. The dual from_message/3 clauses handle both string-key and atom-key input formats consistently, with proper request_timestamp propagation.

Note on normalize_patterns/1: The atom-key clause at line 249 still returns the pattern as-is rather than explicitly extracting only pos and value keys. This was previously flagged as a nitpick to strip potential extra keys.

📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b80ceb4 and c6fb79f.

📒 Files selected for processing (19)
  • .changeset/beige-ladybugs-give.md
  • .github/workflows/sync_service_tests.yml
  • packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex
  • packages/elixir-client/lib/electric/client/message.ex
  • packages/elixir-client/lib/electric/client/stream.ex
  • packages/elixir-client/test/electric/client/message_test.exs
  • packages/elixir-client/test/electric/client_test.exs
  • packages/sync-service/dev/postgres.conf
  • packages/sync-service/dev/postgres2.conf
  • packages/sync-service/mix.exs
  • packages/sync-service/test/electric/postgres/configuration_test.exs
  • packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs
  • packages/sync-service/test/electric/shapes/partitioned_tables_test.exs
  • packages/sync-service/test/integration/streaming_test.exs
  • packages/sync-service/test/integration/subquery_move_out_test.exs
  • packages/sync-service/test/support/db_setup.ex
  • packages/sync-service/test/support/integration_setup.ex
  • packages/sync-service/test/support/stream_consumer.ex
  • packages/sync-service/test/test_helper.exs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead

Applied to files:

  • packages/sync-service/test/integration/streaming_test.exs
🧬 Code graph analysis (6)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
  • assert_insert (56-70)
  • assert_up_to_date (124-135)
packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
  • query! (90-105)
packages/elixir-client/test/electric/client_test.exs (4)
packages/elixir-client/lib/electric/client/stream.ex (1)
  • next (144-149)
packages/elixir-client/test/electric/client/embedded_test.exs (1)
  • stream (11-13)
packages/elixir-client/lib/electric/client.ex (3)
  • stream (493-495)
  • stream (503-505)
  • stream (543-547)
packages/elixir-client/lib/electric/client/mock.ex (1)
  • up_to_date (157-160)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
  • build_router_opts (487-506)
packages/elixir-client/test/electric/client/message_test.exs (1)
packages/elixir-client/lib/electric/client/message.ex (6)
  • parse (256-258)
  • parse (260-267)
  • parse (269-271)
  • parse (273-280)
  • parse (282-289)
  • parse (291-293)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)
packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
  • cast_to (186-193)
  • cast_to (195-226)
  • cast_to (228-233)
packages/elixir-client/lib/electric/client/stream.ex (1)
packages/elixir-client/lib/electric/client/message.ex (6)
  • parse (256-258)
  • parse (260-267)
  • parse (269-271)
  • parse (273-280)
  • parse (282-289)
  • parse (291-293)
🪛 actionlint (1.7.10)
.github/workflows/sync_service_tests.yml

45-45: unexpected key "command" for "services" section. expected one of "credentials", "env", "image", "options", "ports", "volumes"

(syntax-check)

🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md

6-6: First line in a file should be a top-level heading

(MD041, first-line-heading, first-line-h1)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Test packages/experimental w/ sync-service
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (43)
packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs (1)

16-17: LGTM! Necessary pool size increase for concurrent connection usage.

This tag is required because the test spawns a Task (lines 25-40) that holds a database connection indefinitely via DBConnection.run with Process.sleep(:infinity), while the main test process also needs to query ctx.db_conn to verify lock state. With pool_size of 1, this would cause a deadlock.

packages/sync-service/mix.exs (1)

122-123: LGTM!

The addition of electric_client as a test-only dependency is appropriate for enabling the integration tests. The configuration is correct: only: [:test] restricts it to the test environment, and runtime: false ensures it won't be started as an application dependency. The path reference to ../elixir-client is valid and the package name matches.

packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)

64-75: Pre-existing escaped backslash issue already tracked.

The escaped backslash handling gap in decode_quoted_elem/3 (where \\ should decode to a single \) was identified in a previous review and is tracked in issue #3708. Since this PR only adds the conditional wrapper and doesn't modify the decoding logic, no action is needed here.


1-2: Conditional Ecto dependency guard is consistent and correct.

The Code.ensure_loaded?(Ecto) pattern wrapping the ArrayDecoder module is idiomatic for optional dependencies and is consistent across all related modules:

  • Parent EctoAdapter module uses the same conditional guard
  • Sibling Postgres module also uses the same pattern

This ensures the module is only defined when Ecto is available, which is appropriate since the module depends on Ecto.CastError and Ecto.Type.

packages/sync-service/dev/postgres.conf (1)

4-4: LGTM!

The max_connections = 200 setting aligns with the CI workflow and postgres2.conf changes, providing consistent connection limits across dev and CI environments for the expanded test suite.

packages/sync-service/test/support/db_setup.ex (1)

7-7: LGTM!

Reducing pool_size to 1 for the utility pool is sensible since database creation/teardown operations are sequential by nature and don't benefit from parallelism. This also helps manage overall connection usage with the expanded test suite.

packages/sync-service/dev/postgres2.conf (1)

4-4: LGTM!

Consistent with postgres.conf and CI workflow changes, maintaining uniform connection limits across all dev postgres instances.

.changeset/beige-ladybugs-give.md (1)

1-6: LGTM!

The changeset correctly declares patch versions for both affected packages. The description accurately summarizes the feature. The markdownlint warning about MD041 is a false positive—changeset files intentionally use YAML frontmatter format.

packages/sync-service/test/electric/postgres/configuration_test.exs (1)

107-107: LGTM! Pool size override ensures test reliability.

The pool_size: 2 tag is necessary here since the test requires one connection to hold the ACCESS EXCLUSIVE lock while another connection attempts the publication operation. This prevents potential deadlocks in the test.

packages/sync-service/test/test_helper.exs (1)

10-14: LGTM! Well-documented workaround for circular dependency.

The comment clearly explains the circular dependency issue and the rationale for the direct application start. The pattern match {:ok, _} is appropriate for standard test runs.

packages/sync-service/test/support/stream_consumer.ex (2)

210-223: Clean recursive pattern matching implementation.

The pattern_matches?/2 helper correctly handles nested maps and missing keys.


293-303: Well-designed macro with proper cleanup handling.

The with_consumer macro correctly uses try/after to ensure the consumer is stopped even when the test block raises. Good use of var!(consumer) for variable hygiene.

packages/sync-service/test/integration/streaming_test.exs (1)

1-70: LGTM! Well-structured integration test suite for basic streaming.

The test module provides good coverage of fundamental streaming scenarios:

  • Initial snapshot with pre-existing data
  • Live change propagation
  • Empty table handling

The setup chain (with_unique_db, with_basic_tables, with_sql_execute, with_complete_stack, with_electric_client) establishes a clean test environment. Using async: false is correct for integration tests that share database state.

packages/sync-service/test/integration/subquery_move_out_test.exs (7)

1-49: LGTM! Comprehensive integration test setup for subquery move-out.

The test module is well-organized with:

  • Clear moduledoc explaining test scope
  • Proper parent/child table schema for subquery testing
  • Shape definition with @subquery_where filtering by active parents

The first test correctly validates that change messages include tags for subquery-matched rows, which is foundational for move-out support.


55-73: LGTM! Move-out control message test validates core functionality.

This test correctly verifies the expected behavior when a parent is deactivated: the child row receives a synthetic delete. The test sequence (insert → up_to_date → deactivate parent → delete) matches the expected flow.


80-107: LGTM! Multiple synthetic deletes test validates batch move-out.

Good use of await_count to collect multiple delete messages and verify both children are removed when their shared parent is deactivated.


225-270: Excellent edge case coverage for stale tag index entries.

This test (lines 225-270) validates a critical edge case: when a child changes parents, deactivating the old parent should not generate a spurious delete. The detailed comment explains the expected behavior clearly. This protects against the bug where stale tag-index entries would cause incorrect synthetic deletes.


288-312: LGTM! Tag consistency test properly validates deterministic behavior.

The test now correctly asserts that tags are present (lines 310-311), addressing the previous review feedback about conditional assertions silently passing when tags are nil.


330-357: LGTM! Resume test validates state preservation across disconnects.

This test correctly simulates the client disconnect/reconnect scenario and verifies that move-out events processed while disconnected are properly reflected when resuming.


360-381: LGTM! Helper function creates appropriate test schema.

The with_parent_child_tables helper creates a clean parent/child relationship with proper foreign key constraints and returns the table metadata for test setup.

packages/elixir-client/test/electric/client_test.exs (7)

38-47: LGTM! drain_stream helper enables internal state inspection.

This helper is well-designed for testing:

  • Recursively consumes the stream until halt
  • Returns the final stream struct for state inspection
  • Enables tests to verify tag_to_keys and key_data state

1375-1429: LGTM! Tag index empty test validates backward compatibility.

This test ensures that when messages have no tags (non-subquery shapes), the tag index remains empty and doesn't unnecessarily track rows.


1431-1492: LGTM! Core move-out synthetic delete test.

This test validates the fundamental move-out flow:

  1. Insert with tag tracked in index
  2. Move-out pattern matches the tag
  3. Synthetic delete generated with correct value

1777-1858: Test name accurately reflects expected behavior after past feedback.

The test "update without removed_tags deduplicates synthetic deletes" correctly describes the expected deduplication behavior. This addresses the past review comment about misleading test names.


1860-1924: Test documents expected multi-tag behavior.

This test correctly asserts that a row with multiple tags should NOT be deleted when only one tag is moved out. The comment on lines 1914-1915 and assertion on line 1922-1923 clearly document the expected behavior.


2073-2165: Important edge case: update removing all tags.

This test validates that when an update removes all tags from a row (via removed_tags with no new tags), the tag index is properly cleared. This prevents stale entries from causing incorrect synthetic deletes.

The detailed comments (lines 2073-2081) clearly explain the expected behavior and why this matters.


2167-2251: LGTM! Resume state preservation test.

This test validates that after resuming a stream, the tag index state is properly restored so move-out events can still generate synthetic deletes. This is critical for correct behavior across client disconnects.

packages/elixir-client/test/electric/client/message_test.exs (4)

5-8: LGTM! Test setup updated for new message types.

The alias now includes MoveOutMessage and the module-level @ts timestamp provides consistent test data.


36-83: LGTM! Comprehensive MoveOutMessage parsing tests.

Good coverage of:

  • String keys (from JSON)
  • Atom keys (from internal representation)
  • Multiple patterns with different positions

The single-assert pattern matching style (lines 73-81) is clean and idiomatic, addressing past feedback.


85-131: LGTM! Tag header parsing tests.

Tests cover all tag-related scenarios:

  • Tags only
  • Removed_tags only
  • Both tags and removed_tags
  • Defaults to empty lists when absent

133-173: LGTM! ChangeMessage parsing tests.

Good coverage of insert, update (with old_value), and delete operations. Tests validate key, value, headers, and handle propagation.

packages/elixir-client/lib/electric/client/stream.ex (7)

22-27: LGTM! Tag index fields added to stream struct.

The comments clearly explain the dual data structure:

  • tag_to_keys: efficient lookup of keys by tag value
  • key_data: tracks each key's tags and latest message

This design enables O(1) lookups in both directions.


242-261: Move-out handling is well-implemented.

The handler correctly:

  1. Generates synthetic deletes for matching patterns
  2. Updates tag indices
  3. Buffers synthetic deletes for emission

The comment on lines 246-247 documents the assumption about move-out timing, which was discussed in past review comments.


267-274: Resume message correctly includes tag state.

The ResumeMessage now carries tag_to_keys and key_data for state preservation across client disconnects. This is essential for correct move-out handling after resume.


397-416: Resume correctly restores tag state with safe defaults.

Using Map.get(resume, :tag_to_keys, %{}) provides backward compatibility with resume messages that don't have tag state (e.g., from older clients or non-subquery shapes).


436-493: Tag index update logic is comprehensive and correct.

The implementation correctly handles:

  • Insert: add key to tag_to_keys, store in key_data
  • Update: remove old tags, add new tags, update message
  • Delete: remove key from all its tags, delete from key_data
  • No tags: don't track the key at all

The logic for handling removed_tags properly removes stale entries, preventing the bug where old parent deactivation would incorrectly delete a re-parented child.


524-587: Synthetic delete generation handles edge cases correctly.

Key implementation details:

  1. First pass collects all matching keys and removes tags from tag_to_keys
  2. Second pass checks if keys have remaining tags before generating deletes
  3. Only keys with no remaining tags get synthetic deletes
  4. Synthetic deletes use the latest stored message value

The deduplication is implicit: each key appears only once in key_data, so even if multiple patterns match the same key, only one delete is generated.


279-282: LGTM! Error unwrapping handles various response shapes.

The unwrap_error helper correctly handles:

  • Empty list → "Unknown error"
  • Single-element list → extract message
  • Multi-element list → return as-is
  • Other values → return as-is
packages/elixir-client/lib/electric/client/message.ex (5)

7-58: LGTM!

The Headers module properly integrates the new tags and removed_tags fields with appropriate defaults and type definitions. The from_message/2 function correctly handles optional fields with Map.get/3 fallbacks.


60-112: LGTM!

The request_timestamp propagation is consistent across both from_message/3 clauses, and the type definition correctly reflects the required field.


114-166: LGTM!

The from_message/4 function correctly accepts and sets request_timestamp, maintaining consistency with other message types. The value mapping error handling is appropriate.


168-199: LGTM!

The tag_to_keys and key_data fields are properly typed for move-out state tracking. The msg field type has been correctly updated to ChangeMessage.t() as previously requested.


273-289: Potential pattern mismatch between parse/4 and MoveOutMessage.from_message/3.

The parse/4 clauses match move-out events without requiring the patterns key:

def parse(%{"headers" => %{"event" => "move-out"}} = msg, ...)

However, MoveOutMessage.from_message/3 requires patterns to be present:

def from_message(%{"headers" => %{"event" => "move-out", "patterns" => patterns}}, ...)

If the server ever sends a move-out event without patterns, this will raise a FunctionClauseError.

Consider aligning the pattern match in parse/4 to require patterns:

🛡️ Proposed defensive fix
 def parse(
-      %{"headers" => %{"event" => "move-out"}} = msg,
+      %{"headers" => %{"event" => "move-out", "patterns" => _}} = msg,
       shape_handle,
       _value_mapper_fun,
       request_timestamp
     ) do
   [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
 end

 def parse(
-      %{headers: %{event: "move-out"}} = msg,
+      %{headers: %{event: "move-out", patterns: _}} = msg,
       shape_handle,
       _value_mapper_fun,
       request_timestamp
     ) do
   [MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
 end

Please confirm that the Electric server protocol guarantees patterns is always present in move-out events. If not guaranteed, the defensive fix above would prevent the crash and let such messages fall through (or add explicit error handling).

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@electric-sql electric-sql deleted a comment from blacksmith-sh bot Jan 15, 2026
@robacourt robacourt force-pushed the rob/elixir-client-move-outs branch from c6fb79f to 2d76f1c Compare January 15, 2026 13:19
@electric-sql electric-sql deleted a comment from blacksmith-sh bot Jan 15, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/elixir-client/lib/electric/client/stream.ex`:
- Around line 246-251: Add explicit documentation about the assumption that
move-out pattern matching uses non-positional (whole-tag) matching for composite
tags by updating the module-level docs or the MoveOutMessage struct docs;
mention that generate_synthetic_deletes relies on this and that positional
matching for composite tags is intentionally not supported. Reference the
generate_synthetic_deletes function and the MoveOutMessage type so readers know
where the assumption is enforced, and briefly state why this simplifies applying
move-out events immediately (no buffering for later inserts).

In `@packages/sync-service/test/integration/subquery_move_out_test.exs`:
- Around line 109-131: The test comment is misleading about the delete
mechanism: in the "deleting parent row triggers move-out" test (which calls
Client.stream and asserts via assert_delete), clarify whether the child is
removed by Postgres ON DELETE CASCADE or by Electric's synthetic move-out;
update the inline comment that currently reads "Should receive a synthetic
delete for the child" to accurately state the expected behavior (e.g., "Should
receive a delete for the child (cascade delete)" or explicitly note that
Electric emits a synthetic delete if that is the expected behavior) so the
assertion via assert_delete matches the documented expectation.
♻️ Duplicate comments (2)
packages/elixir-client/lib/electric/client/message.ex (1)

246-251: Pattern normalization now consistently extracts only required fields.

The atom-key clause uses = pattern which preserves any extra keys. For complete consistency with the string-key clause, consider explicit extraction.

♻️ Suggested fix for complete normalization
   defp normalize_patterns(patterns) do
     Enum.map(patterns, fn
       %{"pos" => pos, "value" => value} -> %{pos: pos, value: value}
-      %{pos: _, value: _} = pattern -> pattern
+      %{pos: pos, value: value} -> %{pos: pos, value: value}
     end)
   end
packages/sync-service/test/support/stream_consumer.ex (1)

197-216: Strict ordering assertion may cause confusing test failures.

The do_assert_receive_message function calls flunk on the first non-matching message (line 210). This means if an up_to_date message arrives before an expected insert, the test fails with "Received unexpected message" rather than waiting for the insert.

This is actually the desired behavior for these tests (strict ordering), but consider documenting this in the module doc or function docs. The past review flagged this as inconsistent with assert_resume, but looking at the current code, all assertions now use the same assert_receive_message helper.

📝 Suggested documentation addition
+  `@doc` """
+  Assert an insert message is received with matching value fields.
+
+  **Note:** This assertion is strict - any non-matching message received first
+  will cause the test to fail. Ensure message ordering is deterministic.
+  """
   def assert_insert(%__MODULE__{} = consumer, value_pattern, timeout \\ nil) do
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6fb79f and 2d76f1c.

📒 Files selected for processing (12)
  • .changeset/beige-ladybugs-give.md
  • packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex
  • packages/elixir-client/lib/electric/client/message.ex
  • packages/elixir-client/lib/electric/client/stream.ex
  • packages/elixir-client/test/electric/client/message_test.exs
  • packages/elixir-client/test/electric/client_test.exs
  • packages/sync-service/mix.exs
  • packages/sync-service/test/integration/streaming_test.exs
  • packages/sync-service/test/integration/subquery_move_out_test.exs
  • packages/sync-service/test/support/integration_setup.ex
  • packages/sync-service/test/support/stream_consumer.ex
  • packages/sync-service/test/test_helper.exs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead

Applied to files:

  • packages/sync-service/test/integration/streaming_test.exs
🧬 Code graph analysis (6)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
  • build_router_opts (487-506)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
  • assert_insert (56-70)
  • assert_up_to_date (124-135)
packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
  • query! (90-105)
packages/sync-service/test/test_helper.exs (1)
packages/sync-service/lib/electric/application.ex (1)
  • start (6-20)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)
packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
  • cast_to (186-193)
  • cast_to (195-226)
  • cast_to (228-233)
packages/sync-service/test/integration/subquery_move_out_test.exs (1)
packages/sync-service/test/support/stream_consumer.ex (7)
  • assert_insert (56-70)
  • assert_up_to_date (124-135)
  • assert_delete (105-119)
  • await_count (161-166)
  • assert_update (76-100)
  • collect_messages (242-247)
  • assert_resume (140-151)
packages/elixir-client/test/electric/client/message_test.exs (1)
packages/elixir-client/lib/electric/client/message.ex (6)
  • parse (256-258)
  • parse (260-267)
  • parse (269-271)
  • parse (273-280)
  • parse (282-289)
  • parse (291-293)
🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md

6-6: First line in a file should be a top-level heading

(MD041, first-line-heading, first-line-h1)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (33)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)

1-94: LGTM! Conditional guard correctly makes the module optional.

The Code.ensure_loaded?(Ecto) pattern is the standard Elixir idiom for conditionally defining modules that depend on optional packages. This correctly prevents compilation errors when Ecto is not available.

The parent EctoAdapter module is similarly guarded, ensuring consistency throughout the dependency chain—callers of ArrayDecoder.decode!/2 via EctoAdapter.cast_to({:array, type}) will never encounter undefined module errors.

.changeset/beige-ladybugs-give.md (1)

2-3: The patch version bump is appropriate for this PR. The project's established convention uses patch for feature additions (e.g., "Support multiple subqueries on the same level" was marked as patch, and "Add function to dynamically re-enable consumer suspension" was also marked as patch). The minor designation is reserved for major architectural changes, such as database storage replacements. This changeset aligns with the project's versioning patterns.

Likely an incorrect or invalid review comment.

packages/sync-service/mix.exs (1)

122-124: LGTM!

The test-scoped path dependency for electric_client correctly enables sync-service integration tests to exercise the Elixir client against the local HTTP server. Using runtime: false is appropriate since the client is only needed at test time for assertions and streaming, not as a runtime dependency.

packages/sync-service/test/test_helper.exs (1)

10-14: LGTM!

The workaround is well-documented and addresses a real circular dependency issue. The direct call to Electric.Client.Application.start/2 bypasses OTP's dependency resolution, preventing the deadlock that would occur when OTP tries to start both :electric and :electric_client applications.

packages/elixir-client/test/electric/client/message_test.exs (4)

10-34: LGTM!

The ControlMessage tests are correctly updated to pass the request_timestamp parameter, maintaining coverage for all control message types with both string and atom key formats.


36-83: LGTM!

Comprehensive tests for MoveOutMessage parsing covering both key formats and multiple patterns. The consolidated pattern matching approach on lines 73-81 is clean and idiomatic.


85-131: LGTM!

Thorough coverage of tag-related header parsing, including the important edge case of verifying defaults to empty lists for backward compatibility with untagged shapes.


133-174: LGTM!

Well-structured tests for ChangeMessage parsing covering all three operation types (insert, update, delete) with appropriate field validation including old_value handling for updates.

packages/sync-service/test/integration/streaming_test.exs (3)

1-22: LGTM!

Well-structured integration test module with clear documentation. The async: false setting is appropriate for tests that interact with a shared database and HTTP server. The modular setup chain keeps the test configuration clean and reusable.


24-61: LGTM!

Solid integration tests covering both streaming modes. The first test verifies snapshot retrieval with live: false, while the second demonstrates the complete live streaming flow—receiving the initial snapshot, then observing real-time changes from a Postgrex.query! insert. The with_consumer macro provides a clean assertion interface.


63-69: LGTM!

Important edge case coverage ensuring the client correctly handles empty tables by receiving an up-to-date control message without any preceding inserts.

packages/sync-service/test/support/integration_setup.ex (1)

1-42: LGTM!

Well-designed test helper with clear documentation. The implementation follows good practices:

  • Ephemeral port binding (port: 0) avoids port conflicts in parallel test runs
  • num_acceptors: 1 is appropriate for test scenarios
  • Proper use of ExUnit.Callbacks.start_supervised ensures cleanup
  • Returns a comprehensive context map for test assertions
packages/elixir-client/test/electric/client_test.exs (6)

38-47: LGTM! Clean helper for inspecting stream internals.

The recursive drain_stream/1 helper correctly iterates through the stream until :halt and returns the final stream struct. The local alias keeps it self-contained.


898-898: max_retries increase is appropriate.

The increase from 10 to 15 accommodates the 8 retry statuses plus connection retry and initial 500 retry in the test scenario (lines 852-862). This ensures the test doesn't flake due to insufficient retries.


1375-1429: Solid test for backward compatibility with untagged shapes.

The test correctly verifies that when messages lack tags, the tag index remains empty, ensuring no interference with shapes that don't use move-out semantics.


1431-1492: Good coverage of basic move-out → synthetic delete flow.

The test validates the core functionality: insert with tag → move-out matching tag → synthetic delete with correct value.


1777-1858: Test name and comments now align with behavior.

The test "update without removed_tags deduplicates synthetic deletes" correctly documents expected behavior (deduplication), and the assertion verifies only 1 synthetic delete is generated.


2167-2251: Test implementation is correct - tag state is properly persisted during resume.

The ResumeMessage struct properly stores tag_to_keys and key_data when created at up-to-date (lines 270-274), and the resume function correctly restores these fields (lines 399-407). The test logic is sound and should work as expected.

packages/sync-service/test/integration/subquery_move_out_test.exs (3)

1-36: Well-structured integration test module.

Good moduledoc explaining the three key aspects being tested. The setup chain properly wires database, stack, and client components.


330-357: Resume test scenario may have timing issues.

The test deactivates the parent "while disconnected" (between stream1 completing and stream2 starting). However, the move-out event is generated server-side when the UPDATE happens. If stream2 resumes and the server already processed the move-out, the client should receive it. But if the resume offset is past the move-out event, the client may never see it.

Consider adding a comment clarifying whether this tests:

  1. Server-side buffering of move-out events for resumed clients, or
  2. Client-side tag state restoration enabling local synthetic delete generation

360-381: LGTM! Clean table setup helper.

The helper creates a proper parent/child relationship with foreign key and returns table metadata for test assertions.

packages/elixir-client/lib/electric/client/stream.ex (5)

22-27: Clear documentation of the dual-index design.

The comments explain the purpose of both tag_to_keys (tag → keys lookup) and key_data (key → tags + message lookup). This bidirectional index enables efficient move-out processing.


242-261: Move-out handling looks correct.

The implementation correctly:

  1. Generates synthetic deletes for matching patterns
  2. Buffers them for emission
  3. Updates both index structures

The assumption comment about move-out timing after snapshot is appropriate.


279-282: Good defensive handling for empty error arrays.

The unwrap_error([]) clause prevents crashes when the server returns an empty error array, providing a fallback message.


436-493: Tag index update logic handles edge cases well.

The function properly:

  • Handles deletes by clearing the key from both structures
  • Handles updates with removed_tags by computing set differences
  • Clears keys entirely when they have no remaining tags

One minor observation: when updated_tags is empty but current_tags was non-empty, tags are removed from tag_to_keys correctly (lines 468-474).


524-587: Synthetic delete generation correctly handles multi-tag scenarios.

The two-pass approach (collect matched keys → filter by remaining tags) ensures:

  1. Multiple patterns don't create duplicate deletes
  2. Keys with remaining tags aren't deleted

The request_timestamp is properly propagated from the move-out event.

packages/sync-service/test/support/stream_consumer.ex (2)

1-52: Clean consumer lifecycle management.

The start/2 function properly captures the test PID and spawns a task that forwards messages. The stop/1 uses brutal_kill which is appropriate for test cleanup.


265-289: Ergonomic macro for test cleanup.

The with_consumer macro correctly:

  1. Starts the consumer
  2. Binds consumer variable for use in the block
  3. Ensures cleanup in after clause

Using var!(consumer) makes the binding available in the caller's scope.

packages/elixir-client/lib/electric/client/message.ex (5)

8-48: Headers struct properly extended for tag support.

The additions of tags and removed_tags fields with sensible defaults ([]) maintain backward compatibility. The type definitions are clear.


71-95: ControlMessage now properly propagates request_timestamp.

Both from_message clauses (string and atom keys) consistently accept and store the timestamp.


190-198: ResumeMessage type correctly uses ChangeMessage.t() for msg field.

This addresses the past review comment about type safety. The key_data map value type is now properly typed.


201-252: MoveOutMessage module is well-designed.

Good moduledoc explaining the purpose. The normalize_patterns helper handles both string and atom key formats.


254-293: Parse function correctly threads request_timestamp through all branches.

All message type branches now receive and propagate the timestamp consistently. The empty string case returns [] as expected.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@electric-sql electric-sql deleted a comment from blacksmith-sh bot Jan 15, 2026
robacourt and others added 7 commits January 15, 2026 13:30
Rename unwrap to unwrap_error and add handling for empty error arrays
from the server. Previously an empty array would return [] as the error
message; now it returns "Unknown error".

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
When resuming a stream, the client was making non-live requests
(live: false) because up_to_date? defaulted to false. This caused
the server to return immediately with up-to-date instead of
long-polling for new data.

Since a ResumeMessage is only created when the client is already
up-to-date, resuming should set up_to_date?: true so the first
request uses long-polling to wait for any changes that occurred
while disconnected.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Set max_connections = 200 in local dev configs and CI workflow to
support integration tests that open multiple database connections.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Reduce default pool_size from 2 to 1 in test setup to minimize
connection usage. Tests that require multiple connections now use
explicit @tag connection_opt_overrides to request a larger pool.

Also set partitioned_tables_test to async: false to avoid connection
contention.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add test helpers for running integration tests with Electric.Client
against a real HTTP server:

- Add electric_client as test dependency in mix.exs
- Start electric_client application in test_helper.exs
- Add Support.IntegrationSetup with with_electric_client/2
- Add Support.StreamConsumer for consuming streams in tests
- Add basic streaming integration tests to demonstrate usage

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Wrap ArrayDecoder module in Code.ensure_loaded?(Ecto) guard to prevent
compile errors when the elixir-client is used as a dependency in
projects that don't have Ecto available.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add client-side support for handling move-out events from the server
when using shapes with subquery filters.

Changes to message handling:
- Add tags and removed_tags fields to Headers struct
- Add request_timestamp parameter to Message.parse/4
- Add MoveOutMessage type for parsing move-out control messages
- Add tag_to_keys and key_data fields to ResumeMessage for state preservation

Changes to stream processing:
- Track row tags in tag_to_keys and key_data indices
- Handle MoveOutMessage by generating synthetic delete messages
- Reset tag indices on shape reset
- Preserve and restore tag state when resuming streams

The move-out feature enables the server to notify clients when rows
should be removed from their view because they no longer match the
shape's subquery filter. The client generates synthetic delete messages
to cleanly remove these rows from local state.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@robacourt robacourt force-pushed the rob/elixir-client-move-outs branch from 2d76f1c to 0631be3 Compare January 15, 2026 13:34
@blacksmith-sh
Copy link
Contributor

blacksmith-sh bot commented Jan 15, 2026

Found 44 test failures on Blacksmith runners:

Failures

Test View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test cleanup procedure handles dropping slot on termination
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test process dependencies handles status messages after shape cache restart
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test shutdown manager blocked resolving a pool connection terminates cleanly
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor backtracks the status when the canary goes down
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor backtracks the status when the replication client goes down
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor backtracks the status when the shape cache goes down
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor backtracks the status when the shape log collector goes down
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor reports status=active when all connection processes are running
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor reports status=starting once the exclusive connection lock is acqui
red
View Logs
Elixir.Electric.Connection.ConnectionManagerTest/
test status monitor resets the status when connection manager goes down
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test must-refetch clears move-out state must-refetch resets tag tracking state
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test resume preserves move-out state move-out after resume generates synthetic delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables change messages include tags for subquery-matched rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables deleting parent row triggers move-out
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-out generates synthetic deletes for all affected child rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables receives move-out control message when parent is deactivated
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates deactivating old parent after child changed parents sh
ould not generate delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates update that changes parent reference updates tags
View Logs
Elixir.Electric.Plug.LowPrivilegeRouterTest/test /v1/
shapes GET fails to create initial snapshot when Electric does not own the user table
View Logs
Elixir.Electric.Plug.LowPrivilegeRouterTest/test /v1/
shapes with manual publication management GET fails to create initial snapshot when Ele
ctric cannot read from the table
View Logs
Elixir.Electric.Plug.LowPrivilegeRouterTest/test /v1/
shapes with manual publication management GET fails to create initial snapshot when Ele
ctric cannot read from the table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - admission control tracks initial and existing requests separately
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - admission control tracks initial and existing requests separately
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-in from the inner shape causes a query and new entries in th
e outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows 3 level subquery in where clauses
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries return 400 if subquery references unknown table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subset snapshots GET with any parameter mentioning a subset returns a subset s
napshot
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subset snapshots GET with log=changes_only doesn't return any data initial but
lets all updates through
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subset snapshots subsets can filter by enum values using IN
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subset snapshots subsets can filter by enum values using IN
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subset snapshots subsets can filter with explicit type cast on parameter
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subset snapshots subsets return 400 for invalid enum values
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes DELETE forces the shape handle to be different on reconnect and new snapshot to
be created
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET received only a diff when receiving updates
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives old rows in updates when replica=full
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives only specified columns out of wide table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns a 409 on a truncate and can follow a new shape afterwards
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns correct INSERT and DELETE operations that have been converted from U
PDATEs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns updated schema in header after column nullability changes
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns updated schema in header after column nullability changes
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes now offset returns an up-to-date response regardless of existing data
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/shapes Recreating the table causes a 409 View Logs
Elixir.Electric.Plug.RouterTest/test secure mode allows OPTIONS requests to /v1/
shape without secret
View Logs
Elixir.Electric.Plug.RouterTest/test secure mode requires secret for /v1/shape deletion View Logs

Fix in Cursor

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @.github/workflows/sync_service_tests.yml:
- Line 45: Add a brief inline comment above the existing command key to explain
that the service container "command" key (used on the postgres service) is an
undocumented but working way to override the container CMD in GitHub Actions;
reference the 'command' key so maintainers know why it's present and that
actionlint/docs may not show it.
♻️ Duplicate comments (2)
packages/sync-service/test/support/integration_setup.ex (1)

17-41: Well-structured integration test helper.

The implementation correctly uses ephemeral ports, proper supervised processes, and follows ExUnit conventions. One minor consideration: the hardcoded 2000ms timeout on line 18 may be tight in slower CI environments.

♻️ Optional: Make timeout configurable
 def with_electric_client(ctx, opts \\ []) do
-  :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: 2000)
+  status_timeout = Keyword.get(opts, :status_timeout, 2000)
+  :ok = Electric.StatusMonitor.wait_until_active(ctx.stack_id, timeout: status_timeout)
packages/sync-service/test/support/stream_consumer.ex (1)

56-151: Assertion functions have consistent strict-ordering behavior.

All assertion helpers (assert_insert, assert_update, assert_delete, assert_up_to_date, assert_resume) now use assert_receive_message which fails immediately on the first non-matching message. This provides strict ordering enforcement, which is appropriate for precise integration test scenarios.

📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2d76f1c and 0631be3.

📒 Files selected for processing (18)
  • .changeset/beige-ladybugs-give.md
  • .github/workflows/sync_service_tests.yml
  • packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex
  • packages/elixir-client/lib/electric/client/message.ex
  • packages/elixir-client/lib/electric/client/stream.ex
  • packages/elixir-client/test/electric/client/message_test.exs
  • packages/elixir-client/test/electric/client_test.exs
  • packages/sync-service/dev/postgres.conf
  • packages/sync-service/dev/postgres2.conf
  • packages/sync-service/mix.exs
  • packages/sync-service/test/electric/postgres/configuration_test.exs
  • packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs
  • packages/sync-service/test/integration/streaming_test.exs
  • packages/sync-service/test/integration/subquery_move_out_test.exs
  • packages/sync-service/test/support/db_setup.ex
  • packages/sync-service/test/support/integration_setup.ex
  • packages/sync-service/test/support/stream_consumer.ex
  • packages/sync-service/test/test_helper.exs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-01-15T13:37:13.685Z
Learnt from: robacourt
Repo: electric-sql/electric PR: 3699
File: .github/workflows/sync_service_tests.yml:45-45
Timestamp: 2026-01-15T13:37:13.685Z
Learning: In GitHub Actions workflow files, the service container for a job may support a command option to pass startup arguments (e.g., command: postgres -c max_connections=200), even if this usage isn’t clearly documented in the official workflow syntax. This pattern has been observed in the electric-sql/electric repository CI. Treat this as an undocumented but usable convention: verify in the runner version you target, test in CI, and avoid relying on it in public docs. If you use it, consider adding a comment in the workflow explaining the rationale and note that it’s an undocumented capability that may vary across environments.

Applied to files:

  • .github/workflows/sync_service_tests.yml
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead

Applied to files:

  • packages/sync-service/test/integration/streaming_test.exs
🧬 Code graph analysis (5)
packages/sync-service/test/integration/streaming_test.exs (2)
packages/sync-service/test/support/stream_consumer.ex (2)
  • assert_insert (56-70)
  • assert_up_to_date (124-135)
packages/sync-service/lib/electric/postgres/snapshot_query.ex (1)
  • query! (90-105)
packages/sync-service/test/support/integration_setup.ex (1)
packages/sync-service/test/support/component_setup.ex (1)
  • build_router_opts (487-506)
packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (2)
packages/elixir-client/lib/electric/client/fetch/response.ex (1)
  • decode! (28-34)
packages/elixir-client/lib/electric/client/ecto_adapter.ex (3)
  • cast_to (186-193)
  • cast_to (195-226)
  • cast_to (228-233)
packages/sync-service/test/test_helper.exs (1)
packages/sync-service/lib/electric/application.ex (1)
  • start (6-20)
packages/elixir-client/lib/electric/client/stream.ex (1)
packages/elixir-client/lib/electric/client/message.ex (6)
  • parse (256-258)
  • parse (260-267)
  • parse (269-271)
  • parse (273-280)
  • parse (282-289)
  • parse (291-293)
🪛 actionlint (1.7.10)
.github/workflows/sync_service_tests.yml

45-45: unexpected key "command" for "services" section. expected one of "credentials", "env", "image", "options", "ports", "volumes"

(syntax-check)

🪛 markdownlint-cli2 (0.18.1)
.changeset/beige-ladybugs-give.md

6-6: First line in a file should be a top-level heading

(MD041, first-line-heading, first-line-h1)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Test packages/typescript-client w/ sync-service
  • GitHub Check: Test packages/experimental w/ sync-service
  • GitHub Check: Run Lux integration tests
🔇 Additional comments (43)
packages/sync-service/test/support/db_setup.ex (1)

7-7: LGTM! Reducing default pool size aligns with the broader test infrastructure changes.

The reduction from pool_size: 2 to pool_size: 1 is consistent with the PR's approach of increasing max_connections system-wide (to 200) while reducing per-pool concurrency. Tests requiring multiple concurrent connections (like the lock timeout test in configuration_test.exs) explicitly override this default.

packages/sync-service/dev/postgres2.conf (1)

4-4: LGTM! Consistent with the other Postgres configuration changes.

The max_connections = 200 setting aligns with postgres.conf and the CI workflow, ensuring consistent behavior across development and test environments.

packages/sync-service/test/electric/postgres/configuration_test.exs (1)

107-107: LGTM! Necessary override for test requiring concurrent connections.

This test spawns a Task to hold a table lock while simultaneously attempting add_table_to_publication from the test process. With the default pool_size: 1, both operations would contend for the same connection, causing a deadlock. The explicit pool_size: 2 override correctly documents and enables this test's concurrency requirements.

.changeset/beige-ladybugs-give.md (1)

1-6: LGTM! Changeset correctly documents the patch releases.

The changeset follows the standard format for @changesets/cli. The MD041 linting warning about first-line headings is a false positive—changeset files are expected to begin with YAML frontmatter, not markdown headings.

packages/sync-service/mix.exs (1)

122-123: LGTM!

The test dependency is correctly configured with only: [:test] and runtime: false, using a path reference appropriate for monorepo sibling packages.

packages/sync-service/dev/postgres.conf (1)

4-4: LGTM!

Increasing max_connections to 200 aligns with CI workflow settings and supports higher test concurrency requirements introduced by integration tests.

packages/sync-service/test/electric/postgres/lock_breaker_connection_test.exs (1)

16-17: LGTM!

The pool_size: 2 override is appropriate here since this test requires concurrent connections—one to hold the advisory lock (via the Task) and another for the lock breaker operations.

packages/elixir-client/lib/electric/client/ecto_adapter/array_decoder.ex (1)

1-2: LGTM! Parent module has matching guard.

The Code.ensure_loaded?(Ecto) guard is idiomatic for optional dependencies and is consistently applied to both this module and its parent Electric.Client.EctoAdapter, preventing compile errors when Ecto is unavailable.

packages/sync-service/test/test_helper.exs (1)

10-14: LGTM - Circular dependency workaround is well-documented.

The direct application start bypassing OTP dependency resolution is appropriate for the test environment. The comment clearly explains the rationale, and as discussed in prior review, the {:ok, _} pattern match is acceptable since test_helper.exs runs once per fresh VM in standard test runs.

packages/sync-service/test/support/stream_consumer.ex (4)

23-43: LGTM - Clean stream consumer initialization.

The implementation correctly captures the test PID and spawns a Task that forwards stream messages. Using Task.async is appropriate here since the task will be explicitly shut down via stop/1.


48-51: LGTM.


161-193: LGTM - Correct timeout handling.

await_count properly tracks elapsed time using monotonic timestamps and skips non-matching messages (unlike the strict assert_* functions). This is appropriate for collecting a specific count of matching messages regardless of interleaved control messages.


279-289: LGTM - Clean test DSL macro.

The with_consumer macro provides ergonomic test setup/teardown with proper cleanup in the after block. Using var!(consumer) to inject the binding is the standard Elixir approach for test DSL macros.

packages/elixir-client/lib/electric/client/message.ex (6)

7-48: LGTM - Headers struct extended for tag tracking.

The additions of tags and removed_tags fields with sensible defaults ([]) correctly support the move-out semantics. The from_message/2 function properly extracts these from the message payload with fallback to empty lists.


71-95: LGTM - request_timestamp propagation implemented.

Both string-key and atom-key clauses of from_message now accept and assign request_timestamp, addressing the prior review feedback.


129-144: LGTM.


188-198: LGTM - ResumeMessage extended for move-out state preservation.

The tag_to_keys and key_data fields enable resuming streams with preserved tag index state. The type for msg in key_data is now properly specified as ChangeMessage.t() (addressing prior feedback).


201-252: LGTM - MoveOutMessage module is well-structured.

The module correctly handles both string-key and atom-key input formats with proper normalization. The normalize_patterns helper ensures consistent pattern structure for downstream processing.


256-293: LGTM - parse/4 correctly routes all message types with timestamp.

All branches consistently propagate request_timestamp:

  • ChangeMessage via from_message/4
  • ControlMessage via from_message/3
  • MoveOutMessage via from_message/3

The move-out event branches correctly handle both string-key ("headers" => %{"event" => "move-out"}) and atom-key formats.

packages/elixir-client/lib/electric/client/stream.ex (9)

22-27: LGTM - Move-out state fields added with clear documentation.

The inline comments clearly explain the dual data structure approach:

  • tag_to_keys: reverse index for efficient pattern matching
  • key_data: per-key state including tags and latest message

242-261: LGTM - Move-out handling correctly generates synthetic deletes.

The implementation:

  1. Generates synthetic deletes for matching patterns
  2. Adds deletes to the buffer
  3. Updates tag state atomically

The documented assumption about move-out timing (only after snapshot) is appropriate given the protocol guarantees.


267-277: LGTM - ResumeMessage correctly captures move-out state.

Including tag_to_keys and key_data in the ResumeMessage ensures that resumed streams can continue to process move-out events correctly.


279-282: LGTM - Robust error unwrapping.

Handles all edge cases: empty arrays (returns sensible default), single-element arrays (unwraps), multi-element arrays (preserves), and non-arrays (passthrough).


335-347: LGTM - Reset correctly clears move-out state.

On must-refetch, clearing tag_to_keys and key_data is correct since the client will receive a fresh snapshot.


397-416: LGTM - Resume correctly restores move-out state with backward compatibility.

Using Map.get(resume, :tag_to_keys, %{}) ensures backward compatibility with ResumeMessages that don't have these fields (from older client versions or untagged shapes).


436-493: LGTM - Tag index management is correctly implemented.

The logic handles all cases:

  1. Delete operations: Removes key from all associated tags and clears key_data
  2. Insert/Update with no tags: Clears tracking for the key
  3. Insert/Update with tags: Computes tag diffs and updates both indexes

The use of MapSet.difference and MapSet.union for calculating tag changes is correct. The approach of tracking the diff (tags_to_remove, tags_to_add) minimizes unnecessary operations.


495-522: LGTM - Tag manipulation helpers are efficient and correct.

The helpers properly:

  • Iterate directly over MapSets (addressing prior feedback about unnecessary to_list)
  • Clean up empty tag entries from tag_to_keys to prevent unbounded growth

524-587: LGTM - Synthetic delete generation is well-designed.

The two-pass algorithm is correct:

  1. First pass: Atomically removes matching tags from tag_to_keys and tracks affected keys
  2. Second pass: For each affected key, checks remaining tags and only generates delete if no tags remain

Key correctness properties:

  • Uses Map.pop for atomic removal from tag index
  • Preserves rows that still have other tags (partial move-out)
  • Synthetic deletes correctly propagate request_timestamp from the move-out event
  • Uses the latest msg value from key_data for the delete payload
packages/elixir-client/test/electric/client_test.exs (9)

38-47: LGTM - drain_stream helper for inspecting final stream state.

The helper correctly uses tail recursion to consume the stream until {:halt, stream}, returning the final stream struct for state inspection. This is useful for verifying internal state like tag_to_keys and key_data.


1375-1429: LGTM - Test verifies tag index stays empty for untagged shapes.

This test confirms backward compatibility: when messages have no tags, the tag index structures remain empty, ensuring no overhead for shapes that don't use move-out semantics.


1431-1492: LGTM - Core move-out test case.

Correctly verifies the complete flow: insert with tag → move-out pattern match → synthetic delete generated with correct value.


1624-1699: LGTM - Tests removed_tags handling correctly.

This test verifies that when a row's tag is changed (old tag removed, new tag added), a subsequent move-out for the old tag doesn't generate a spurious delete.


1777-1858: LGTM - Deduplication test.

Verifies that updates with the same tag don't cause duplicate synthetic deletes - important for correct tag index management.


1860-1924: Good coverage of multi-tag partial move-out scenario.

This test verifies the important invariant: a row with multiple tags should not be deleted when only one of its tags is moved out. The row should remain visible as long as it has any remaining tags.


2009-2071: Good test for deduplication when multiple patterns match same row.

Verifies that when a single move-out event contains multiple patterns that all match the same row, only one synthetic delete is generated.


2073-2165: Excellent edge case coverage for tag removal.

This test documents the expected behavior when a row's tags are completely removed via an update (with removed_tags but no new tags). The tag index should be cleared so subsequent move-outs are no-ops.


2167-2251: LGTM - Resume state preservation test.

Verifies the critical requirement that move-out state is preserved across stream resume, enabling synthetic deletes to work correctly after resumption.

packages/sync-service/test/integration/streaming_test.exs (1)

1-70: Well-structured integration test module.

The test file properly exercises the Electric.Client streaming functionality with clear test cases covering:

  • Initial snapshot with pre-existing data
  • Live streaming with subsequent inserts
  • Empty table handling

Good use of the with_consumer macro and assertion helpers. The async: false setting is appropriate for integration tests with database state dependencies.

packages/elixir-client/test/electric/client/message_test.exs (1)

1-174: Comprehensive test coverage for message parsing.

The test file thoroughly covers:

  • All ControlMessage types with both string and atom key formats
  • MoveOutMessage parsing with single and multiple patterns
  • Headers with tags, removed_tags, and default empty list behavior
  • ChangeMessage for all operations (insert, update, delete) including old_value handling

Good refactoring of the multiple patterns test to use a single pattern-matching assertion (lines 73-81).

packages/sync-service/test/integration/subquery_move_out_test.exs (4)

109-131: Clarify comment: this may be a CASCADE delete, not a synthetic move-out delete.

The test comment at line 128 says "Should receive a synthetic delete for the child", but the child table has ON DELETE CASCADE (line 372). When the parent row is deleted, PostgreSQL cascades the delete to the child row directly. This is different from the deactivation tests (lines 55-73, 80-107) where UPDATE active = false triggers move-out and synthetic deletes.

Consider clarifying whether:

  1. The test expects a real CASCADE delete from Postgres, or
  2. Electric intercepts and emits a synthetic delete despite the cascade

This distinction matters for understanding the test's intent.


225-270: Excellent edge case test for tag index updates.

This test verifies critical behavior: when a child changes parents, the tag index must be updated so that deactivating the old parent doesn't incorrectly generate a synthetic delete. The detailed comments explain the expected behavior clearly, and the assertion message provides good debugging context.


330-357: Good coverage of resume with move-out state.

This test properly simulates a disconnect/reconnect scenario where changes occur while the client is "offline". Using resume_msg from the first stream to resume ensures the client correctly processes move-out events that happened during disconnection.


360-381: Helper function is well-structured.

The with_parent_child_tables/1 helper cleanly creates the schema needed for subquery tests. Returning %{tables: [...]} follows the setup context pattern used elsewhere.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

--health-retries 5
ports:
- 54321:5432
command: postgres -c max_connections=200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding a comment explaining this undocumented capability.

Based on learnings, the command key for GitHub Actions service containers is an undocumented but functional feature. Since actionlint and official docs don't recognize it, future maintainers may be confused. A brief inline comment would help:

        # Note: 'command' is undocumented but works to override container CMD
        command: postgres -c max_connections=200
🧰 Tools
🪛 actionlint (1.7.10)

45-45: unexpected key "command" for "services" section. expected one of "credentials", "env", "image", "options", "ports", "volumes"

(syntax-check)

🤖 Prompt for AI Agents
In @.github/workflows/sync_service_tests.yml at line 45, Add a brief inline
comment above the existing command key to explain that the service container
"command" key (used on the postgres service) is an undocumented but working way
to override the container CMD in GitHub Actions; reference the 'command' key so
maintainers know why it's present and that actionlint/docs may not show it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Move-In/Out Support for Subqueries in Elixir Client

2 participants