Skip to content

feat: Horizontally scalable SSE Events#303

Open
86667 wants to merge 3 commits intomainfrom
horiz_scalable_sse
Open

feat: Horizontally scalable SSE Events#303
86667 wants to merge 3 commits intomainfrom
horiz_scalable_sse

Conversation

@86667
Copy link
Contributor

@86667 86667 commented Feb 6, 2026

Use Postgres's NOTIFY/LISTEN to propagate events across Homeserver instances.

Changes

  • Add notify_event() to send events via pg_notify
  • Add PgEventListener background service that subscribes to Postgres notifications and forwards events to the local broadcast channel
  • Add serde support for EventEntity and related types

How it works

  1. File write commits to Postgres
  2. notify_event() sends pg_notify('events')
  3. All instances' PgEventListener receive the notification
  4. Each instance broadcasts to its local SSE subscribers

@86667 86667 force-pushed the horiz_scalable_sse branch from 5e2807b to 252121b Compare February 19, 2026 09:49
@86667 86667 force-pushed the horiz_scalable_sse branch from 252121b to 8d20216 Compare February 23, 2026 10:39
@86667 86667 force-pushed the horiz_scalable_sse branch from 8d20216 to 7ad8df7 Compare February 23, 2026 10:43
@86667 86667 requested review from SHAcollision and dzdidi February 23, 2026 15:38
@86667 86667 linked an issue Feb 24, 2026 that may be closed by this pull request
Copy link
Contributor

@SHAcollision SHAcollision left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work. I left some comments, most are just Nit othes might be more relevant, but I could also have not understood fully. Good job!!

})?;
self.events_service.broadcast_event(event);
self.events_service
.notify_event(&event, self.db.pool())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we switched from broadcast_event to notify_event only, but notify_event is best-effort and currently just logs on failure. If the call fails (DB hiccup, payload issue), local SSE clients on this instance will also miss the event permanently. Can we either (a) keep a local broadcast fast-path plus cross-instance dedupe, or (b) add an explicit catch-up mechanism to guarantee no gaps for connected subscribers?

Comment on lines +117 to +127
if let Err(e) = sqlx::query(PG_NOTIFY_QUERY)
.bind(&payload)
.execute(pool)
.await
{
tracing::error!(
event_id = event.id,
path = %event.path,
"Failed to send NOTIFY: {}", e
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Error handling here logs and returns ()? so failed NOTIFYs are silently dropped from the live stream path. Not sure what's best, maybe we could return a Result to callers and define a recovery policy (retry/backoff, fallback local broadcast, or durable catch-up)? This could introduce a reliability regression if there are random failures.

Comment on lines +40 to +51
/// Main loop that handles reconnection on errors.
async fn listen_loop(pool: PgPool, events_service: EventsService) {
loop {
match Self::run_listener(&pool, &events_service).await {
Ok(()) => break, // Clean shutdown (should not happen in normal operation)
Err(e) => {
tracing::error!("PgListener error: {}. Reconnecting in 1s...", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconnect loop is good, but LISTEN/NOTIFY is not durable: events emitted while disconnected are lost. Can we add gap recovery on reconnect (e.g., resume from last delivered event cursor/ID via DB query) so subscribers don’t permanently miss events during reconnect windows?

/// creates separate ephemeral databases per homeserver. Instead, we instantiate
/// only the EventsService + PgEventListener components sharing a single db pool.
#[tokio::test]
#[pubky_test_utils::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test that intentionally drops/restarts one listener while events are being produced, then verifies the restarted instance catches up without gaps? Current tests do not seem to test reconnect-window loss behavior

Comment on lines +16 to +19
pub(crate) const PG_NOTIFY_CHANNEL: &str = "events";

/// SQL query to send a NOTIFY event.
const PG_NOTIFY_QUERY: &str = "SELECT pg_notify('events', $1)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor maintainability nit: channel name is duplicated (PG_NOTIFY_CHANNEL + hardcoded 'events' in SQL). Could we build the SQL from the constant (or centralize both in one place) to avoid config drift?

Comment on lines +70 to +74
tracing::error!(
"Failed to deserialize event notification: {}. Payload: {}",
e,
notification.payload()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super NIT: logging the full raw payload on deserialize failure may leak user path/public key data into logs. We could try truncating payload content and logging structured metadata instead.

Comment on lines +109 to +115
if payload.len() > PG_NOTIFY_WARN_THRESHOLD {
tracing::warn!(
event_id = event.id,
payload_size = payload.len(),
"Event payload size exceeds warning threshold. pg_notify has 8KB limit."
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: nice warning threshold. Could we also log the hard-limit context in structured fields (payload_size, payload_limit=8192) and maybe emit a distinct event name for easier alerting/filtering?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SSE broadcast channel is not horizontally scalabe

2 participants