Conversation
5e2807b to
252121b
Compare
252121b to
8d20216
Compare
8d20216 to
7ad8df7
Compare
SHAcollision
left a comment
There was a problem hiding this comment.
Very nice work. I left some comments, most are just Nit othes might be more relevant, but I could also have not understood fully. Good job!!
| })?; | ||
| self.events_service.broadcast_event(event); | ||
| self.events_service | ||
| .notify_event(&event, self.db.pool()) |
There was a problem hiding this comment.
It seems we switched from broadcast_event to notify_event only, but notify_event is best-effort and currently just logs on failure. If the call fails (DB hiccup, payload issue), local SSE clients on this instance will also miss the event permanently. Can we either (a) keep a local broadcast fast-path plus cross-instance dedupe, or (b) add an explicit catch-up mechanism to guarantee no gaps for connected subscribers?
| if let Err(e) = sqlx::query(PG_NOTIFY_QUERY) | ||
| .bind(&payload) | ||
| .execute(pool) | ||
| .await | ||
| { | ||
| tracing::error!( | ||
| event_id = event.id, | ||
| path = %event.path, | ||
| "Failed to send NOTIFY: {}", e | ||
| ); | ||
| } |
There was a problem hiding this comment.
Does Error handling here logs and returns ()? so failed NOTIFYs are silently dropped from the live stream path. Not sure what's best, maybe we could return a Result to callers and define a recovery policy (retry/backoff, fallback local broadcast, or durable catch-up)? This could introduce a reliability regression if there are random failures.
| /// Main loop that handles reconnection on errors. | ||
| async fn listen_loop(pool: PgPool, events_service: EventsService) { | ||
| loop { | ||
| match Self::run_listener(&pool, &events_service).await { | ||
| Ok(()) => break, // Clean shutdown (should not happen in normal operation) | ||
| Err(e) => { | ||
| tracing::error!("PgListener error: {}. Reconnecting in 1s...", e); | ||
| tokio::time::sleep(Duration::from_secs(1)).await; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Reconnect loop is good, but LISTEN/NOTIFY is not durable: events emitted while disconnected are lost. Can we add gap recovery on reconnect (e.g., resume from last delivered event cursor/ID via DB query) so subscribers don’t permanently miss events during reconnect windows?
| /// creates separate ephemeral databases per homeserver. Instead, we instantiate | ||
| /// only the EventsService + PgEventListener components sharing a single db pool. | ||
| #[tokio::test] | ||
| #[pubky_test_utils::test] |
There was a problem hiding this comment.
Could we add a test that intentionally drops/restarts one listener while events are being produced, then verifies the restarted instance catches up without gaps? Current tests do not seem to test reconnect-window loss behavior
| pub(crate) const PG_NOTIFY_CHANNEL: &str = "events"; | ||
|
|
||
| /// SQL query to send a NOTIFY event. | ||
| const PG_NOTIFY_QUERY: &str = "SELECT pg_notify('events', $1)"; |
There was a problem hiding this comment.
Minor maintainability nit: channel name is duplicated (PG_NOTIFY_CHANNEL + hardcoded 'events' in SQL). Could we build the SQL from the constant (or centralize both in one place) to avoid config drift?
| tracing::error!( | ||
| "Failed to deserialize event notification: {}. Payload: {}", | ||
| e, | ||
| notification.payload() | ||
| ); |
There was a problem hiding this comment.
Super NIT: logging the full raw payload on deserialize failure may leak user path/public key data into logs. We could try truncating payload content and logging structured metadata instead.
| if payload.len() > PG_NOTIFY_WARN_THRESHOLD { | ||
| tracing::warn!( | ||
| event_id = event.id, | ||
| payload_size = payload.len(), | ||
| "Event payload size exceeds warning threshold. pg_notify has 8KB limit." | ||
| ); | ||
| } |
There was a problem hiding this comment.
NIT: nice warning threshold. Could we also log the hard-limit context in structured fields (payload_size, payload_limit=8192) and maybe emit a distinct event name for easier alerting/filtering?
Use Postgres's
NOTIFY/LISTENto propagate events across Homeserver instances.Changes
notify_event()to send events viapg_notifyPgEventListenerbackground service that subscribes to Postgres notifications and forwards events to the local broadcast channelEventEntityand related typesHow it works
notify_event()sendspg_notify('events')PgEventListenerreceive the notification