Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/thick-bottles-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Add experimental SSE support.
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@
},
"devDependencies": {
"glob": "^10.3.10"
},
"pnpm": {
"patchedDependencies": {
"@microsoft/fetch-event-source": "patches/@microsoft__fetch-event-source.patch"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Electric.Plug.ServeShapePlug do
all_params =
Map.merge(conn.query_params, conn.path_params)
|> Map.update("live", "false", &(&1 != "false"))
|> Map.update("experimental_live_sse", "false", &(&1 != "false"))

case Api.validate(api, all_params) do
{:ok, request} ->
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Electric.Shapes do
else
# If we have a shape handle, but no shape, it means the shape was deleted. Send a 409
# and expect the client to retry - if the state of the world allows, it'll get a new handle.
{:error, Electric.Shapes.Api.Error.must_refetch()}
{:error, Electric.Shapes.Api.Error.must_refetch(opts)}
end
end

Expand Down
236 changes: 219 additions & 17 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
alias __MODULE__
alias __MODULE__.Request
alias __MODULE__.Response
alias __MODULE__.SseState

import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]

Expand All @@ -27,7 +28,9 @@
required: true
],
allow_shape_deletion: [type: :boolean],
keepalive_interval: [type: :integer],
long_poll_timeout: [type: :integer],
sse_timeout: [type: :integer],
max_age: [type: :integer],
stack_ready_timeout: [type: :integer],
stale_age: [type: :integer],
Expand All @@ -50,12 +53,15 @@
:stack_id,
:storage,
allow_shape_deletion: false,
keepalive_interval: 21_000,
long_poll_timeout: 20_000,
sse_timeout: 60_000,
max_age: 60,
stack_ready_timeout: 5_000,
stale_age: 300,
send_cache_headers?: true,
encoder: Electric.Shapes.Api.Encoder.JSON,
sse_encoder: Electric.Shapes.Api.Encoder.SSE,
configured: false
]

Expand All @@ -65,7 +71,6 @@
# Aliasing for pattern matching
@before_all_offset LogOffset.before_all()
@offset_out_of_bounds %{offset: ["out of bounds for this shape"]}
@must_refetch [%{headers: %{control: "must-refetch"}}]

# Need to implement Access behaviour because we use that to extract config
# when using shapes api
Expand Down Expand Up @@ -320,10 +325,14 @@

# TODO: discuss returning a 307 redirect rather than a 409, the client
# will have to detect this and throw out old data

%{params: %{experimental_live_sse: in_sse?}} = request
error = Api.Error.must_refetch(experimental_live_sse: in_sse?)

{:error,
Response.error(request, @must_refetch,
Response.error(request, error.message,
handle: active_shape_handle,
status: 409
status: error.status
)}
end

Expand Down Expand Up @@ -489,17 +498,21 @@
handle: shape_handle,
chunk_end_offset: chunk_end_offset,
global_last_seen_lsn: global_last_seen_lsn,
params: %{offset: offset, live: live?},
params: %{offset: offset, live: live?, experimental_live_sse: in_sse?},
api: api,
response: response
} = request

case Shapes.get_merged_log_stream(api, shape_handle, since: offset, up_to: chunk_end_offset) do
case Shapes.get_merged_log_stream(api, shape_handle,
since: offset,
up_to: chunk_end_offset,
experimental_live_sse: in_sse?
) do
{:ok, log} ->
if live? && Enum.take(log, 1) == [] do
request
|> update_attrs(%{ot_is_immediate_response: false})
|> hold_until_change()
|> handle_live_request()
else
up_to_date_lsn =
if live? do
Expand All @@ -512,9 +525,9 @@
max(global_last_seen_lsn, chunk_end_offset.tx_offset)
end

body = Stream.concat([log, maybe_up_to_date(request, up_to_date_lsn)])
log_stream = Stream.concat(log, maybe_up_to_date(request, up_to_date_lsn))

%{response | chunked: true, body: encode_log(request, body)}
%{response | chunked: true, body: encode_log(request, log_stream)}
end

{:error, %Api.Error{} = error} ->
Expand All @@ -523,10 +536,11 @@
{:error, :unknown} ->
# the shape has been deleted between the request validation and the attempt
# to return the log stream
Response.error(request, @must_refetch, status: 409)
error = Api.Error.must_refetch(experimental_live_sse: in_sse?)
Response.error(request, error.message, status: error.status)

Check warning on line 540 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L539-L540

Added lines #L539 - L540 were not covered by tests

{:error, %SnapshotError{type: :schema_changed}} ->
error = Api.Error.must_refetch()
error = Api.Error.must_refetch(experimental_live_sse: in_sse?)
Logger.warning("Schema changed while creating snapshot for #{shape_handle}")
Response.error(request, error.message, status: error.status)

Expand Down Expand Up @@ -563,12 +577,20 @@
end
end

defp handle_live_request(%Request{params: %{experimental_live_sse: true}} = request) do
stream_sse_events(request)
end

defp handle_live_request(%Request{} = request) do
hold_until_change(request)
end

defp hold_until_change(%Request{} = request) do
%{
new_changes_ref: ref,
last_offset: last_offset,
handle: shape_handle,
params: %{shape_definition: shape_def},
params: %{shape_definition: shape_def, experimental_live_sse: in_sse?},
api: %{long_poll_timeout: long_poll_timeout} = api
} = request

Expand Down Expand Up @@ -603,13 +625,16 @@
|> do_serve_shape_log()

{^ref, :shape_rotation, new_handle} ->
Response.error(request, @must_refetch,
error = Api.Error.must_refetch(experimental_live_sse: in_sse?)

Check warning on line 628 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L628

Added line #L628 was not covered by tests

Response.error(request, error.message,

Check warning on line 630 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L630

Added line #L630 was not covered by tests
handle: new_handle,
status: 409
status: error.status

Check warning on line 632 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L632

Added line #L632 was not covered by tests
)

{^ref, :shape_rotation} ->
Response.error(request, @must_refetch, status: 409)
error = Api.Error.must_refetch(experimental_live_sse: in_sse?)
Response.error(request, error.message, status: error.status)
after
# If we timeout, return an up-to-date message
long_poll_timeout ->
Expand All @@ -628,6 +653,160 @@
end
end

defp stream_sse_events(%Request{} = request) do
%{
new_changes_ref: ref,
handle: shape_handle,
api: %{keepalive_interval: keepalive_interval, sse_timeout: sse_timeout},
params: %{offset: since_offset}
} = request

Logger.debug(
"Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"

Check warning on line 665 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L665

Added line #L665 was not covered by tests
)

# Set up timer for SSE comment as keep-alive
keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

# Set up timer for SSE timeout
timeout_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)

# Stream changes as SSE events for the duration of the timer.
sse_event_stream =
Stream.resource(
fn ->
%SseState{
mode: :receive,
request: request,
stream: nil,
since_offset: since_offset,
last_message_time: System.monotonic_time(:millisecond),
keepalive_ref: keepalive_ref
}
end,
&next_sse_event/1,
fn %SseState{keepalive_ref: latest_keepalive_ref} ->
Process.cancel_timer(latest_keepalive_ref)
Process.cancel_timer(timeout_ref)
end
)

response = %{request.response | chunked: true, body: sse_event_stream}

%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
end

defp next_sse_event(%SseState{mode: :receive} = state) do
%{
keepalive_ref: keepalive_ref,
last_message_time: last_message_time,
request:
%{
api: %{
keepalive_interval: keepalive_interval
},
handle: shape_handle,
new_changes_ref: ref
} = request,
since_offset: since_offset
} = state

receive do
{^ref, :new_changes, latest_log_offset} ->
updated_request =

Check warning on line 716 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L716

Added line #L716 was not covered by tests
%{request | last_offset: latest_log_offset}
|> determine_global_last_seen_lsn()
|> determine_log_chunk_offset()
|> determine_up_to_date()

# This is usually but not always the `latest_log_offset`
# as per `determine_log_chunk_offset/1`.
end_offset = updated_request.chunk_end_offset

Check warning on line 724 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L724

Added line #L724 was not covered by tests

in_sse? = true

Check warning on line 726 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L726

Added line #L726 was not covered by tests

case Shapes.get_merged_log_stream(
updated_request.api,

Check warning on line 729 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L728-L729

Added lines #L728 - L729 were not covered by tests
shape_handle,
since: since_offset,
up_to: end_offset,
experimental_live_sse: in_sse?
) do
{:ok, log} ->
Process.cancel_timer(keepalive_ref)

Check warning on line 736 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L736

Added line #L736 was not covered by tests

control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
message_stream = Stream.concat(log, control_messages)
encoded_stream = encode_log(updated_request, message_stream)

Check warning on line 740 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L738-L740

Added lines #L738 - L740 were not covered by tests

current_time = System.monotonic_time(:millisecond)

Check warning on line 742 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L742

Added line #L742 was not covered by tests

new_keepalive_ref =

Check warning on line 744 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L744

Added line #L744 was not covered by tests
Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

{[],
%{
state
| mode: :emit,
stream: encoded_stream,
since_offset: end_offset,
last_message_time: current_time,
keepalive_ref: new_keepalive_ref
}}

{:error, _error} ->

Check warning on line 757 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L757

Added line #L757 was not covered by tests
{[], state}
end

{^ref, :shape_rotation} ->
must_refetch = %{headers: %{control: "must-refetch"}}
message = encode_message(request, must_refetch)

Check warning on line 763 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L762-L763

Added lines #L762 - L763 were not covered by tests

{message, %{state | mode: :done}}

{:sse_keepalive, ^ref} ->
current_time = System.monotonic_time(:millisecond)
time_since_last_message = current_time - last_message_time

Check warning on line 769 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L768-L769

Added lines #L768 - L769 were not covered by tests

if time_since_last_message >= keepalive_interval do
new_keepalive_ref =

Check warning on line 772 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L771-L772

Added lines #L771 - L772 were not covered by tests
Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

{[": keep-alive\n\n"],
%{state | last_message_time: current_time, keepalive_ref: new_keepalive_ref}}
else
# Not time to send a keep-alive yet, schedule for the remaining time
remaining_time = keepalive_interval - time_since_last_message
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, remaining_time)

Check warning on line 780 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L779-L780

Added lines #L779 - L780 were not covered by tests

{[], %{state | keepalive_ref: new_keepalive_ref}}
end

{:sse_timeout, ^ref} ->
{[], %{state | mode: :done}}
end
end

defp next_sse_event(%SseState{mode: :emit} = state) do
%{stream: stream} = state

Check warning on line 791 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L791

Added line #L791 was not covered by tests

# Can change the number taken to adjust the grouping. Currently three
# because there's typically 3 elements per SSE -- the actual message
# and the "data: " and "\n\n" delimiters around it.
#
# The JSON encoder groups stream elements by 500. So perhaps this
# could be a larger number for more efficiency?
case StreamSplit.take_and_drop(stream, 3) do
{[], _tail} ->

Check warning on line 800 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L799-L800

Added lines #L799 - L800 were not covered by tests
{[], %{state | mode: :receive, stream: nil}}

{head, tail} ->

Check warning on line 803 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L803

Added line #L803 was not covered by tests
{head, %{state | stream: tail}}
end
end

defp next_sse_event(%SseState{mode: :done} = state), do: {:halt, state}

defp no_change_response(%Request{} = request) do
%{response: response, global_last_seen_lsn: global_last_seen_lsn} =
update_attrs(request, %{ot_is_empty_response: true})
Expand Down Expand Up @@ -671,16 +850,35 @@
def stack_id(%Api{stack_id: stack_id}), do: stack_id
def stack_id(%{api: %{stack_id: stack_id}}), do: stack_id

defp encode_log(%Request{api: api, params: %{live: true, experimental_live_sse: true}}, stream) do
encode_sse(api, :log, stream)
end

defp encode_log(%Request{api: api}, stream) do
encode(api, :log, stream)
end

@spec encode_message(Api.t() | Request.t(), term()) :: Enum.t()
def encode_message(%Request{api: api}, message) do
# Error messages are encoded normally, even when using SSE
# because they are returned on the original fetch request
# with a status code that is not 2xx.
@spec encode_error_message(Api.t() | Request.t(), term()) :: Enum.t()
def encode_error_message(%Api{} = api, message) do
encode(api, :message, message)
end

def encode_error_message(%Request{api: api}, message) do
encode(api, :message, message)
end

def encode_message(%Api{} = api, message) do
@spec encode_message(Request.t(), term()) :: Enum.t()
def encode_message(
%Request{api: api, params: %{live: true, experimental_live_sse: true}},
message
) do
encode_sse(api, :message, message)

Check warning on line 878 in packages/sync-service/lib/electric/shapes/api.ex

View check run for this annotation

Codecov / codecov/patch

packages/sync-service/lib/electric/shapes/api.ex#L878

Added line #L878 was not covered by tests
end

def encode_message(%Request{api: api}, message) do
encode(api, :message, message)
end

Expand All @@ -689,6 +887,10 @@
apply(encoder, type, [message])
end

defp encode_sse(%Api{sse_encoder: sse_encoder}, type, message) when type in [:message, :log] do
apply(sse_encoder, type, [message])
end

def schema(%Response{
api: %Api{inspector: inspector},
shape_definition: %Shapes.Shape{} = shape
Expand Down
Loading
Loading