Skip to content

Conversation

@xingyaoww
Copy link
Collaborator

@xingyaoww xingyaoww commented Jan 22, 2026

Summary

This PR fixes issue #1785 where RemoteEventsList could miss events (like SystemPromptEvent) that are emitted between conversation creation and WebSocket subscription completion.

Problem

When RemoteConversation is created:

  1. RemoteEventsList._do_full_sync() fetches existing events via REST (none exist yet if agent not initialized)
  2. WebSocketCallbackClient.start() starts a daemon thread to connect WebSocket (non-blocking)
  3. Constructor returns immediately

If send_message() is called before the WebSocket subscription completes, events emitted during that call (like SystemPromptEvent from lazy agent initialization) are:

  • ✅ Correctly persisted on the server in state.events
  • ❌ Published to _pub_sub with zero subscribers (WebSocket not yet subscribed)
  • ❌ Never received by RemoteEventsList

This caused test_remote_conversation_over_real_server to be flaky - passing locally but failing in CI due to timing differences.

Solution

Implemented the "WebSocket Subscription Handshake" approach as proposed in #1785 comment:

  1. Added _ready threading.Event to WebSocketCallbackClient - Tracks subscription completion
  2. Added wait_until_ready(timeout) method - Blocks until ready or timeout
  3. _client_loop sets _ready on first ConversationStateUpdateEvent - The server already sends this immediately after subscription completes
  4. RemoteConversation.__init__ waits for ready - Ensures WebSocket is subscribed before returning

Key insight

The server already sends a ConversationStateUpdateEvent immediately after subscribe_to_events() completes (see event_service.py lines 330-353). We use this as the "ready" signal without requiring any protocol changes.

Changes

  • openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py:

    • Added _ready threading.Event to WebSocketCallbackClient
    • Added wait_until_ready(timeout) method
    • Modified _client_loop to set _ready on first ConversationStateUpdateEvent
    • Modified RemoteConversation.__init__ to wait for ready with 10s timeout
  • tests/sdk/conversation/remote/test_websocket_subscription_ready.py:

    • New test file with 8 tests covering the ready signaling mechanism

Testing

  • All new tests pass
  • All existing remote conversation tests pass (67 tests)
  • Live server test test_remote_conversation_over_real_server passes reliably

Fixes #1785

@xingyaoww can click here to continue refining the PR


Agent Server images for this PR

GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server

Variants & Base Images

Variant Architectures Base Image Docs / Tags
java amd64, arm64 eclipse-temurin:17-jdk Link
python amd64, arm64 nikolaik/python-nodejs:python3.12-nodejs22 Link
golang amd64, arm64 golang:1.21-bookworm Link

Pull (multi-arch manifest)

# Each variant is a multi-arch manifest supporting both amd64 and arm64
docker pull ghcr.io/openhands/agent-server:c406701-python

Run

docker run -it --rm \
  -p 8000:8000 \
  --name agent-server-c406701-python \
  ghcr.io/openhands/agent-server:c406701-python

All tags pushed for this build

ghcr.io/openhands/agent-server:c406701-golang-amd64
ghcr.io/openhands/agent-server:c406701-golang_tag_1.21-bookworm-amd64
ghcr.io/openhands/agent-server:c406701-golang-arm64
ghcr.io/openhands/agent-server:c406701-golang_tag_1.21-bookworm-arm64
ghcr.io/openhands/agent-server:c406701-java-amd64
ghcr.io/openhands/agent-server:c406701-eclipse-temurin_tag_17-jdk-amd64
ghcr.io/openhands/agent-server:c406701-java-arm64
ghcr.io/openhands/agent-server:c406701-eclipse-temurin_tag_17-jdk-arm64
ghcr.io/openhands/agent-server:c406701-python-amd64
ghcr.io/openhands/agent-server:c406701-nikolaik_s_python-nodejs_tag_python3.12-nodejs22-amd64
ghcr.io/openhands/agent-server:c406701-python-arm64
ghcr.io/openhands/agent-server:c406701-nikolaik_s_python-nodejs_tag_python3.12-nodejs22-arm64
ghcr.io/openhands/agent-server:c406701-golang
ghcr.io/openhands/agent-server:c406701-java
ghcr.io/openhands/agent-server:c406701-python

About Multi-Architecture Support

  • Each variant tag (e.g., c406701-python) is a multi-arch manifest supporting both amd64 and arm64
  • Docker automatically pulls the correct architecture for your platform
  • Individual architecture tags (e.g., c406701-python-amd64) are also available if needed

@github-actions
Copy link
Contributor

github-actions bot commented Jan 22, 2026

Coverage

Coverage Report •
FileStmtsMissCoverMissing
openhands-sdk/openhands/sdk/conversation/impl
   remote_conversation.py53418665%70–76, 83–86, 117, 124, 147, 149, 155, 157–160, 170, 179, 192–193, 198–201, 236, 250, 277, 291, 296–299, 330, 350–351, 384, 387–389, 403, 423, 431, 443, 451–454, 457, 462–465, 467, 472–473, 478–482, 487–491, 496–499, 502, 513–514, 518, 522, 525, 596, 602, 604, 607, 633–634, 638–639, 642, 650, 669–670, 675, 677–678, 689, 707, 720–721, 725, 731–732, 736, 741–742, 747–749, 752–756, 758–759, 763, 765–773, 775, 779, 794, 812, 847, 849, 852, 880, 890–891, 919–920, 925, 933–937, 944–945, 949, 954–958, 962–970, 973–974, 983–984, 993, 1001, 1006–1008, 1010, 1013, 1015–1016, 1036, 1038, 1044–1045, 1060, 1067, 1073–1074, 1089, 1102, 1108–1109, 1116–1117
TOTAL17301782954% 

@xingyaoww
Copy link
Collaborator Author

@jpshackelford can you check if this works?

@jpshackelford
Copy link
Contributor

Absolutely. Thanks for looking at it.

This fixes issue #1785 where RemoteEventsList could miss events emitted
between conversation creation and WebSocket subscription completion.

The fix adds a ready signaling mechanism to WebSocketCallbackClient:
- Added _ready threading.Event to track subscription completion
- Added wait_until_ready(timeout) method to block until ready
- _client_loop sets _ready on first ConversationStateUpdateEvent
- RemoteConversation.__init__ waits for ready before returning

The server already sends a ConversationStateUpdateEvent immediately after
subscription completes, so we use this as the 'ready' signal.

This ensures events emitted during send_message() (like SystemPromptEvent
from lazy agent initialization) are reliably delivered via WebSocket.

Co-authored-by: openhands <openhands@all-hands.dev>
@jpshackelford jpshackelford marked this pull request as ready for review January 22, 2026 19:23
Copy link
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix for the WebSocket subscription race condition. Found one important issue with shutdown behavior and a few suggestions for improvement.

Returns:
True if the WebSocket is ready, False if timeout expired.
"""
return self._ready.wait(timeout=timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: wait_until_ready() should also check the _stop event to avoid blocking for the full timeout when the client is being stopped.

If stop() is called before the ready event is set, the WebSocket thread will exit but this method will continue blocking for the full timeout period (potentially 10 seconds). This creates poor UX during error scenarios or shutdown.

Consider checking both events:

Suggested change
return self._ready.wait(timeout=timeout)
def wait_until_ready(self, timeout: float | None = None) -> bool:
"""Wait for WebSocket subscription to complete.
The server sends a ConversationStateUpdateEvent immediately after
subscription completes. This method blocks until that event is received,
the client is stopped, or the timeout expires.
Args:
timeout: Maximum time to wait in seconds. None means wait forever.
Returns:
True if the WebSocket is ready, False if stopped or timeout expired.
"""
import time
start_time = time.time()
while True:
if self._ready.is_set():
return True
if self._stop.is_set():
return False
if timeout is not None and (time.time() - start_time) >= timeout:
return False
time.sleep(0.05) # Check every 50ms

# Wait for WebSocket subscription to complete before allowing operations.
# This ensures events emitted during send_message() are not missed.
# The server sends a ConversationStateUpdateEvent after subscription.
if not self._ws_client.wait_until_ready(timeout=10.0):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: Consider making this timeout configurable (e.g., via an environment variable or parameter) for environments with high latency or slow connections. 10 seconds is reasonable for most cases, but may not be sufficient in all scenarios.

Comment on lines +643 to +651
logger.warning(
f"WebSocket subscription did not complete within timeout for "
f"conversation {self._id}. Events may be missed."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: This warning could be more actionable. Consider adding guidance on what users should do or check when this occurs.

Suggested change
logger.warning(
f"WebSocket subscription did not complete within timeout for "
f"conversation {self._id}. Events may be missed."
)
logger.warning(
f"WebSocket subscription did not complete within timeout for "
f"conversation {self._id}. Events may be missed. "
f"Check network connectivity and verify the server is responding."
)

Comment on lines +174 to +179
def test_ready_not_set_for_non_state_update_events(self, mock_message_event):
"""Test that non-ConversationStateUpdateEvent events don't trigger ready.
Note: The ready signal is set in _client_loop when processing messages,
not in the callback. This test verifies the callback doesn't set ready.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Nit: The docstring comment is a bit confusing. Since _client_loop is the only place that sets _ready, stating "this test verifies the callback doesn't set ready" implies callbacks might sometimes set it. Consider clarifying:

Suggested change
def test_ready_not_set_for_non_state_update_events(self, mock_message_event):
"""Test that non-ConversationStateUpdateEvent events don't trigger ready.
Note: The ready signal is set in _client_loop when processing messages,
not in the callback. This test verifies the callback doesn't set ready.
"""
def test_ready_not_set_for_non_state_update_events(self, mock_message_event):
"""Test that directly invoking callbacks does not set the ready flag.
The ready signal is only set by _client_loop when it processes a
ConversationStateUpdateEvent from the WebSocket, not by callbacks.
"""

 is fixed

The WebSocket subscription race condition is now fixed by ensuring
RemoteConversation waits for WebSocket subscription to complete before
allowing operations.

Re-enables:
- test_remote_conversation_over_real_server

Note: test_security_risk_field_with_live_server remains skipped due to a
different issue - ActionEvent may be emitted after client starts closing
the WebSocket, causing delivery failure. This is a separate bug from #1785
(subscription race during connect). The test should be fixed to use REST
API for event verification instead of relying on WebSocket delivery.

Co-authored-by: openhands <openhands@all-hands.dev>
@jpshackelford jpshackelford force-pushed the openhands/fix-websocket-subscription-ready branch from 6f64a5d to 87a2347 Compare January 22, 2026 19:30
Copy link
Contributor

@jpshackelford jpshackelford left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #1791 Review: Testing Results and New Finding

Testing Performed

I re-enabled test_remote_conversation_over_real_server in my local environment was previously skipped due to #1785:

Results:

  • 8 unit tests pass in tests/sdk/conversation/remote/test_websocket_subscription_ready.py
  • test_remote_conversation_over_real_server now passes - confirming the fix works for the subscription race condition

The fix correctly ensures that RemoteConversation waits for WebSocket subscription to complete before allowing operations, preventing events like SystemPromptEvent from being missed.


New Issue Discovered: Events Lost During Client Disconnection

While investigating, I found that test_security_risk_field_with_live_server still fails even with this fix. Investigation revealed a separate race condition at the other end of the WebSocket lifecycle:

Root Cause

Events emitted during client disconnection can be lost. The sequence is:

  1. Test runs conversation with a mocked finish tool call
  2. Server emits ActionEvent + ObservationEvent
  3. conv.run() returns when status becomes "finished"
  4. Client starts closing WebSocket
  5. Server tries to send ActionEvent but WebSocket is already closing
  6. Error: RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'

Evidence

Debug output shows the ActionEvent is missing:

Events in conv.state.events:
  0: ConversationStateUpdateEvent
  1: SystemPromptEvent
  2: MessageEvent
  3: ConversationStateUpdateEvent
  4: ObservationEvent - tool_name=finish  ← We get this
  5: ConversationStateUpdateEvent

Did NOT find ActionEvent with finish tool  ← ActionEvent lost during disconnect!

Server logs confirm the delivery failure:

error_sending_event: ActionEvent(id='68d7fcc7...', source='agent', timestamp='...')
RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'

Proposed Fix Options

This PR fixes one end of the WebSocket lifecycle (wait until ready before operations). A symmetric fix is needed for the other end.

Note: The evidence shows events arriving out of order - ObservationEvent arrived but ActionEvent (which should come first) was lost. This points to async event publishing as the root cause.

Option A: Server-Side - Synchronous Event Publishing (Recommended)

Ensure the server sends events synchronously in order before emitting the next event:

  1. Server emits ActionEvent → waits for WebSocket send to complete
  2. Server emits ObservationEvent → waits for send to complete
  3. Server emits ConversationStateUpdateEvent (finished)
  4. Client receives all events in proper order
  5. run() returns with complete event history

Pros:

  • Fixes the root cause (async publishing causing out-of-order/lost events)
  • Events arrive correctly in real-time via WebSocket
  • No extra REST call needed
  • Maintains proper event ordering semantics

Cons:

  • Requires server code changes
  • May add slight latency to event publishing
  • Doesn't handle client-initiated disconnects (client closes browser mid-conversation)

Option B: Client-Side - Automatic REST Sync After Completion

Add automatic REST API sync when completing/closing:

  1. Add a sync() method to RemoteEventsList that re-fetches events from REST API
  2. Call sync() automatically after run() completes to capture any missed events

Pros:

  • Symmetric with wait_until_ready() pattern
  • Works with any server version (backward compatible)
  • Simple to implement
  • Defensive - REST API is authoritative regardless of WebSocket issues

Cons:

  • Adds latency after run() completes
  • Events may still arrive out of order via WebSocket (sync fixes it after the fact)
  • Doesn't address the root cause of async publishing

Question

Would you like to expand the scope of this PR to also fix the "sync before disconnect" issue? Or would you prefer to address it in a follow-on issue/PR?

I've updated the test file to:

  • Re-enable test_remote_conversation_over_real_server
  • Keep test_security_risk_field_with_live_server skipped with an accurate reason describing this new issue

Let me know how you'd like to proceed!

@xingyaoww
Copy link
Collaborator Author

@OpenHands can you reproduce the bug described here: #1791 (review)?

If so, can you create a test that reproduce it, and push it to the PR. Do not fix it yet

@openhands-ai
Copy link

openhands-ai bot commented Jan 22, 2026

I'm on it! xingyaoww can track my progress at all-hands.dev

This test reproduces the bug described in PR #1791 review where events
emitted during client disconnection can be lost. The sequence is:

1. Test runs conversation with a mocked finish tool call
2. Server emits ActionEvent + ObservationEvent
3. conv.run() returns when status becomes 'finished'
4. Client starts closing WebSocket
5. Server tries to send ActionEvent but WebSocket is already closing

The bug is a race condition that may not always reproduce. The test
verifies that events delivered via WebSocket match events persisted
on the server (fetched via REST API). If they don't match, the bug
is demonstrated.

Also adds flaky_race_condition pytest marker for timing-dependent tests.

See PR #1791 review for details:
#1791 (review)

Co-authored-by: openhands <openhands@all-hands.dev>
@openhands-ai
Copy link

openhands-ai bot commented Jan 22, 2026

I've successfully reproduced the bug described in PR #1791 review and created a test for it. Here's a summary:

Bug Description

The bug is a race condition where events can be lost during client disconnection:

  1. When conv.run() returns (status becomes "finished"), the client starts closing the WebSocket
  2. But the server may still be trying to send events (like ActionEvent)
  3. This causes RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'
  4. The ActionEvent is lost because it was emitted during client disconnection

This is a separate issue from #1785 (subscription race at startup).

Changes Made

1. New Test: test_events_lost_during_client_disconnection

  • File: tests/cross/test_remote_conversation_live_server.py
  • The test:
    • Runs a conversation with a mocked finish tool call
    • Compares events received via WebSocket with events fetched via REST API
    • If WebSocket has fewer events than REST API, the bug is demonstrated
    • Marked with @pytest.mark.flaky_race_condition since the bug is timing-dependent

2. Registered Custom Pytest Marker

  • File: pyproject.toml
  • Added flaky_race_condition marker to avoid pytest warnings

Checklist

  • ✅ Bug reproduced and understood
  • ✅ Test created that demonstrates the bug
  • ✅ Test pushed to PR branch (commit 3410adb0)
  • ✅ Bug NOT fixed (as requested)

The test is designed to fail when the race condition occurs, with a detailed error message explaining the bug. Since it's a timing-dependent race condition, it may pass in some environments but fail in others (e.g., CI vs local).

View full conversation

This implements Option A from PR #1791 review. Previously, wait_until_ready()
would block for the full timeout even when stop() was called. This created
poor UX during error scenarios or shutdown.

Now wait_until_ready() checks both _ready and _stop events:
- Returns True if _ready is set (WebSocket subscription completed)
- Returns False if _stop is set (client is being stopped)
- Returns False if timeout expires

Also adds two new tests to verify the _stop event behavior:
- test_wait_until_ready_returns_false_when_stopped
- test_wait_until_ready_returns_false_when_stopped_during_wait

Co-authored-by: openhands <openhands@all-hands.dev>
@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

@OpenHands Understand this PR well. Read all comments. Investigate the code, make the project, and run the tests discussed here.

Do a /codereview-roasted on this PR.

Post your feedback as a github comment on the PR. Do not make modifications, but make sure you don’t answer me, post it as comment. You can use remote gh as a gh token.

@openhands-ai
Copy link

openhands-ai bot commented Jan 23, 2026

I'm on it! enyst can track my progress at all-hands.dev

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

/codereview-roasted

Taste rating: 🟡 Acceptable (fixes a real race) → but it papers over a protocol/ordering guarantee with client-side busy-waiting, and the tests are… enthusiastic.

What I ran

  • python -m pytest -q tests/sdk/conversation/remote/test_websocket_subscription_ready.py (10 passed)
  • python -m pytest -q tests/sdk/conversation/remote/test_remote_conversation.py (24 passed)
  • python -m pytest -q tests/cross/test_remote_conversation_live_server.py (5 passed, 1 skipped) after installing monorepo packages (openhands-sdk, openhands-workspace, openhands-tools).

Summary of the change

You’re fixing #1785: events emitted between RemoteConversation construction and WS subscription completion could be published to _pub_sub before any subscribers exist, so RemoteEventsList never sees them.

The PR adds a “subscription handshake”:

  • WebSocketCallbackClient now has _ready: threading.Event
  • it sets _ready upon first ConversationStateUpdateEvent
  • RemoteConversation.__init__ blocks up to 10s on wait_until_ready() before returning

That’s a real bug and this probably makes the CI flake go away.

[CRITICAL ISSUES]

[openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py] Blocking in __init__ is an API smell

You changed object construction from “cheap” to “may block up to 10s”. That’s a behavior change that will surprise callers, especially:

  • anyone creating many conversations
  • anyone doing this on a UI/main thread
  • anyone constructing in async contexts

If you must block, this should be:

  • clearly documented in the public API docs for RemoteConversation (not just an internal comment)
  • configurable (timeout / disable waiting)
  • ideally moved to an explicit connect()/start() step so construction doesn’t do I/O

Right now you’re relying on “people won’t notice” because it’s only 10 seconds on failure. That’s not a guarantee; it’s a landmine.

Handshake correctness depends on a non-contractual server behavior

The client assumes “server sends ConversationStateUpdateEvent immediately after subscribing”. Today it does. Tomorrow someone refactors the server and you’re back to silent event loss.

At minimum, define this as an explicit contract in the server/client protocol docs (or make the server send an explicit subscribed/ready message type).

Busy-wait loop

wait_until_ready() uses a polling loop with sleep(0.05). It’s not horrible, but it’s also unnecessary complexity: you already have an Event.

You added polling only to handle _stop (good catch from the bot), but this should be done with a cleaner primitive (e.g., wait on _ready with a short timeout in a loop) or by setting _ready when stopping. The current approach is “works, but why are we burning CPU and adding timing knobs?”

[IMPROVEMENT OPPORTUNITIES]

Make timeout configurable

Hardcoding 10.0 in RemoteConversation.__init__ is arbitrary. In slow CI / high latency environments, 10s can be too short; in fast local, it’s fine. Either:

  • add a parameter (subscription_timeout=)
  • or read from env (OPENHANDS_WS_READY_TIMEOUT)
  • or make it part of SDK config

Also: the failure mode is currently “log a warning and continue”. That means the original bug can still happen, just with a log line. If this is correctness-critical, consider failing fast (raise) unless the caller opts out.

_ready should reset / lifecycle clarity

What happens across reconnects? If the WS client reconnects, _ready stays set forever. If reconnect semantics exist, _ready should represent “current subscription is ready”, not “at some point in the past we were ready”.

If reconnect is impossible by design, document that.

Test suite: a lot of scaffolding for a small behavior

tests/sdk/conversation/remote/test_websocket_subscription_ready.py is very verbose for what’s essentially:

  • _ready exists
  • it becomes set after seeing a specific event
  • wait_until_ready times out / unblocks

Several tests are basically “assert method exists” / “assert attribute exists”. Those are low-value; they lock in implementation details and make refactors painful.

Better tests would focus on externally observable behavior:

  • constructing RemoteConversation doesn’t miss early events (integration test)
  • ready is set only after the correct server message

pyproject.toml markers vs plugins

You added asyncio_mode = "auto" but pytest warns it’s unknown in this environment, and @pytest.mark.asyncio is unknown unless pytest-asyncio is installed. This is not caused by your PR, but the repo’s test/deps story is messy.

If CI uses a pinned env where these plugins exist, fine. But locally this repo currently “collects 2k tests and then dies” unless you install a bunch of extras. That’s not great.

[STYLE / TASTE NOTES]

  • The warning message in RemoteConversation.__init__ is not actionable. If you’re going to continue, at least tell users what to do (network? server health?) or what invariants are now broken.
  • Using ConversationStateUpdateEvent as the ready signal is clever, but it’s also implicit. I’d rather see an explicit protocol message than “first event of type X means Y”.

Verdict

Probably worth merging because it addresses a real, user-visible correctness issue and de-flakes CI.

❗ But the current form is “client-side duct tape”: it changes construction semantics and relies on an implicit server behavior. If you want this to be robust long-term, make the handshake a real protocol contract and expose configuration/opt-out for the blocking behavior.

Key insight: you fixed one race (startup subscription) by blocking, but you’re still relying on “event delivery over WS is reliable enough” elsewhere — and you already have another flake (test_events_lost_during_client_disconnection) proving shutdown has similar ordering problems.

@openhands-ai

This comment was marked as duplicate.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: We expect a LOT of reconnects. Double check this. Don’t we reset _ready when losing connection or maybe when reinitializing a connection?

ANSWER: I double-checked the current implementation in openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py.

  • _ready is created once in WebSocketCallbackClient.__init__ (self._ready = threading.Event()) and is never cleared anywhere.
  • It is set exactly once: in _client_loop, the first time a ConversationStateUpdateEvent is received:
    if isinstance(event, ConversationStateUpdateEvent) and not self._ready.is_set(): self._ready.set()

Reconnect behavior:

  • There is a retry loop (while not self._stop.is_set(): ... websockets.connect(...) ... except Exception: sleep/backoff).
  • But _ready is not reset on reconnect. If the socket drops and reconnects, _ready remains set from the first successful subscription.
  • Also note: on a clean close, except websockets.exceptions.ConnectionClosed: break exits _client_loop entirely, so in that case there is no reconnect attempt at all.

So today _ready effectively means “this client has ever been ready at least once”, not “the current WS connection is subscribed right now”. If you truly expect lots of reconnects and need ‘current subscription readiness’ semantics, the current code does not provide that; it neither clears _ready on disconnect nor re-arms it per connection attempt.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: You said: “ideally moved to an explicit connect()/start() step so construction doesn’t do I/O”

Investigate deeply. How large of a change is that and what consequences?

ANSWER: I investigated how RemoteConversation is constructed today and what would be involved in moving the network work out of __init__.

What RemoteConversation.__init__ currently does (I/O + side effects)

In openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py, RemoteConversation.__init__ performs multiple network and background-thread side effects before returning:

  1. REST calls (synchronous I/O) to either attach or create:
  • If conversation_id is provided: does GET /api/conversations/{conversation_id} to decide whether to attach or create.
  • If creating: does POST /api/conversations.
  1. Builds RemoteState(...), which immediately does an initial REST sync:
  • RemoteEventsList.__init__ calls _do_full_sync() (REST fetch) up-front.
  1. Starts the WS thread immediately:
  • Constructs WebSocketCallbackClient and calls .start().
  • Now also blocks up to 10s on .wait_until_ready().

So construction today is already “connect and start doing network things”. The PR just made that more visibly blocking.

How big would it be to add an explicit connect()/start() step?

It’s not a tiny tweak; it’s a behavioral/API refactor with ripple effects.

You’d be splitting responsibilities currently coupled in __init__:

  • “configure conversation object” vs
  • “ensure remote server resources exist + sync + subscribe to events”.

A minimal version would likely introduce a state machine:

  • RemoteConversation(..., autostart=True) (default) preserves current behavior
  • RemoteConversation(..., autostart=False) creates a detached instance
  • conv.connect() performs:
    • attach/create REST calls
    • instantiate RemoteState and do initial sync
    • start WS client and wait for readiness

That’s not just adding one method: you must ensure every operation (send_message, run, update_secrets, close, etc.) either:

  • calls connect() implicitly (lazy connect), or
  • raises a clear error if called before connect.

Consequences / tradeoffs

Pros

  • No blocking construction: callers can decide where/when to block.
  • Cleaner lifecycle: easier to reason about reconnection/re-subscription, and easier to expose “wait for WS ready” as an explicit step.
  • Better composability with async apps (construction vs I/O).

Cons / risks

  • Potential breaking change unless you keep autostart=True default.
    • Today Conversation(...) returns a fully usable object. Many tests and user code rely on that.
  • Two-phase init complexity: you now have to handle partially-initialized instances.
    • Example: close() currently assumes _id, _state, and _ws_client exist; with two-phase init those may be missing.
  • Error surface grows: every method needs to handle “not connected yet”.
  • Thread-safety & reentrancy: connect() must be idempotent and safe if multiple threads call it.

Practical size estimate

  • If you keep backwards compatibility (autostart=True), this is likely a moderate change (new method + guardrails) but touches many methods.
  • If you change semantics so users must call connect() explicitly, that’s a large, breaking change and would require updating docs, examples, and lots of tests.

Bottom line

Moving I/O out of __init__ is doable, but not “one PR quick”. It’s a lifecycle/API redesign. If your product expects lots of reconnects and you care about explicit subscription readiness, having an explicit connect() (possibly with autostart=True for compatibility) would make the lifecycle clearer and reduce hidden blocking, but it requires careful handling of partially-connected states across the whole RemoteConversation API.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: WDYT about the direction in which we should go, ideally, for good design of RemoteConversation ?

ANSWER: I re-read the two problematic scenarios and dug into the recent plugin-loading PR (#1651, merged ~1 day ago) to understand why these races got exposed.

Quick refresher: what goes wrong today

  1. WS start / subscription race (RemoteEventsList may miss events emitted before WebSocket subscription completes #1785 / this PR)
  • RemoteConversation.__init__ does an initial REST sync (0 events), then starts WS in a background thread.
  • If send_message() happens before WS subscription is active, the server persists events, but publishes to _pub_sub before the WS subscriber exists, so the client-side RemoteEventsList cache misses those events forever.
  1. WS end / disconnect race (the “events lost during client disconnection” issue)
  • When the client decides the run is finished, it starts shutting down the WS.
  • The server can still be emitting final events; if the server tries to websocket.send after close has started, you can get RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close' (or equivalent missing event delivery).

Is the plugin PR (#1651) likely involved?

Yes — not because plugins “break websockets”, but because #1651 deliberately introduced more deferred work that emits events at times that are awkward for the client lifecycle.

Key change in #1651: lazy plugin loading / lazy agent initialization on the server side.

  • In ConversationService.start_conversation (agent-server) it explicitly says plugin loading is handled lazily by LocalConversation and will happen on first run()/send_message().
  • That means the first user interaction can now trigger additional initialization steps (load plugins, merge skills/hooks/MCP config, initialize agent), which emit events such as SystemPromptEvent.

Why that matters:

  • Previously, more of that “setup noise” likely happened earlier (or deterministically before the client started interacting), so the WS subscription often had time to become active.
  • With lazy plugin/agent init, the first send_message() is more likely to immediately generate a burst of early events. That makes the “WS not subscribed yet” window show up in CI much more reliably.

So: #1651 probably didn’t create the race, but it amplified it by changing when events are emitted.

Direction for good RemoteConversation design (what I’d do)

The core design issue is that the client currently treats WS as a “best effort live feed” while also using it as the primary incremental update mechanism for a local cache (RemoteEventsList). That’s brittle under both startup and shutdown races, and it will get worse with lots of reconnects.

A robust design should:

1) Define explicit lifecycle + handshake semantics

  • Make the subscription handshake a protocol contract, not an implementation detail (“first ConversationStateUpdateEvent implies ready”).
  • Ideally have the server send an explicit Subscribed/Ready control message (or include a subscription_id/cursor in a formal ack).

2) Make the event stream recoverable (cursor-based)

If reconnects are expected, you need a cursor/offset model:

  • Server includes monotonically increasing sequence_id (or (conversation_id, event_index)), and clients track last-seen.
  • On WS connect, client sends resume_from=last_seen, and server replays missing events (or client immediately REST-syncs from last-seen).

This eliminates the entire class of “missed events during startup/shutdown/reconnect” because the client can always reconcile.

3) Treat WS as an optimization, not the source of truth

  • The authoritative log is persisted (state.events / events API).
  • WS should be “push notifications” that trigger local updates, but correctness should come from the persisted log.
  • Concretely: when WS reconnects (or when an operation completes), do a targeted REST fetch for events since last cursor.

4) Move away from blocking constructors

  • Either: provide explicit connect() / wait_until_ready() APIs, OR
  • keep autostart, but make the blocking behavior configurable and surfaced in the public API.

5) Unify shutdown semantics

  • Client should not assume “FINISHED” implies “no more events will ever arrive.”
  • There should be a terminal event marker (e.g., “final state snapshot committed” / “session end event id”) so the client can wait for it (or verify by REST) before closing the WS.

Bottom line

Given #1651’s lazy initialization direction (which is reasonable), RemoteConversation needs to be designed for events appearing at arbitrary times and for frequent disconnect/reconnect.

The right long-term direction is: cursor-based, recoverable event streaming with an explicit subscription handshake, where WS is a transport and REST is the correctness backstop. The current “subscribe and hope you didn’t miss anything” approach is fundamentally incompatible with the reconnect-heavy world you described.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: Are you sure the server doesnt replay or resend events? Look for replay or for resend.

ANSWER: I looked for replay/resend behavior in both the agent-server and the SDK client.

Server DOES support resending/replay, but it’s opt-in and the SDK is not using it.

Server-side (agent-server)

In openhands-agent-server/openhands/agent_server/sockets.py, the WebSocket endpoint /sockets/events/{conversation_id} accepts a query parameter:

  • resend_all: bool = False

If resend_all=true, the server explicitly replays all existing events over the socket:

if resend_all:
    async for event in page_iterator(event_service.search_events):
        await _send_event(event, websocket)

Independently of resend_all, EventService.subscribe_to_events() also sends an initial ConversationStateUpdateEvent snapshot immediately on subscribe.

Client-side (SDK)

In openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py, WebSocketCallbackClient builds the URL as:

  • /sockets/events/{conversation_id}
  • optionally ?session_api_key=...

It does not append resend_all=true (and there’s no cursor/offset param).

Conclusion

  • The server will not replay the backlog by default.
  • It can replay everything only if resend_all=true is requested.
  • There is no evidence of a cursor-based “resume from last event id/sequence” mechanism in the WS API; the replay mechanism is currently “all or nothing”.

So my earlier statement should be refined to: there is no automatic replay/resume mechanism used by the SDK today; the only replay feature is the opt-in resend_all query param on the WS endpoint.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: Check events/search endpoint too.

ANSWER: I checked the REST events/search endpoint implementation.

Endpoint: GET /api/conversations/{conversation_id}/events/search

Implemented in openhands-agent-server/openhands/agent_server/event_router.py.
It supports:

  • page_id (documented as “next_page_id from the previously returned page”)
  • limit (<= 100)
  • filters: kind, source, body
  • sort: sort_order (timestamp / timestamp_desc)
  • time filters: timestamp__gte, timestamp__lt

How pagination works (important)

In openhands-agent-server/openhands/agent_server/event_service.py, EventService.search_events():

  • collects all events from state.events
  • applies filters
  • sorts by timestamp
  • then paginates by finding the index where event.id == page_id and returning items starting at that index
  • returns next_page_id as the next event’s id

So page_id is a pagination token for the sorted result set, not an explicit “replay/resume from event X” protocol for WebSocket.

Conclusion

  • REST events/search can be used to fetch the event log (and can help reconcile missed events).
  • But there is no cursor/sequence-based resume mechanism exposed here, and the WS API also lacks a “resume from last event” parameter.
  • The only WS replay mechanism currently is resend_all=true (replay everything), which the SDK does not use.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: Check consequences on RemoteEventsList with periodic/triggered REST sync, and tell me if/how do you think it fits in the overall design direction.

ANSWER: I looked at how RemoteEventsList is implemented today and what periodic/triggered REST syncing would mean.

What RemoteEventsList does today

In openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py:

  • RemoteEventsList.__init__ immediately calls _do_full_sync() (REST) and caches all events.
  • After that, it only grows via WS callbacks (add_event()), with dedupe by event.id.
  • _do_full_sync() is currently full history pagination via /api/conversations/{id}/events/search (pages of 100).

So the design assumption is:

One full REST snapshot at construction time + WS delivers every future event exactly once.

That assumption is exactly what breaks under startup/shutdown races and reconnects.

What periodic/triggered REST sync would change

Triggered sync (recommended)

Add a method like refresh() (or internal _sync_missing()), and call it on specific lifecycle events:

  • after WS (re)connect / after subscription ready
  • after any WS disconnect/reconnect
  • after run() returns
  • after send_message() returns (or after a short debounce)

This fits well with the design direction of “WS as a fast path, REST as correctness backstop”.

Periodic sync (use sparingly)

A background timer (e.g. every N seconds) that hits /events/search can paper over missed events even if you never detect disconnect. But:

  • it adds steady load on the server
  • it adds extra complexity and timing heuristics
  • it can still miss short windows unless the interval is small (which increases load)

In practice: periodic sync is a fallback; triggered sync is the main tool.

Consequences / tradeoffs

Pros

  • Fixes both known races in principle:
    • startup miss: once subscribed, a refresh pulls the server’s authoritative event log and fills any gaps
    • shutdown miss: after run finishes (or on close), refresh can reconcile and ensure the local cache contains server-persisted events
  • Makes reconnects survivable without needing resend_all=true.
  • Keeps WS lightweight (no need to replay full history).

Cons / gotchas

  • Without a true cursor/sequence, refresh is either:

    • full history (O(total events) each refresh) — expensive for long conversations
    • or timestamp-based (timestamp__gte) — heuristic and can still miss events if clocks/orderings are weird
    • or page_id-based — but current REST pagination is “start from event with id==page_id in a timestamp-sorted list”, not a stable “after id” cursor
  • Concurrency/order: WS can deliver events while refresh is happening. You need locking (already present) and dedupe (already present).

  • Semantics: events are inserted ordered by timestamp. If refresh returns events with the same timestamp or out-of-order, it’s fine because add_event() inserts by timestamp and dedupes by id.

How it fits the overall design direction

Triggered REST sync is a good intermediate step toward the “right” long-term design:

  1. Near-term pragmatic fix: add a RemoteEventsList.refresh() + call it on connect/reconnect and after operations.
  2. Medium-term: introduce a server-side monotonically increasing event sequence / cursor and expose since=<cursor> on both WS and REST.
  3. Long-term: WS becomes a resumable stream (resume_from=cursor), and REST refresh becomes an occasional sanity check, not a primary mechanism.

So yes: periodic/triggered REST sync fits the direction, but I’d frame it explicitly as a correctness backstop until proper cursor-based streaming exists.

@enyst
Copy link
Collaborator

enyst commented Jan 23, 2026

HUMAN QUESTION: Was it deterministically before the client started or not? Was ws active before for sure or not? (related to the first flakey test) Was it closed normally WITH all events in, or not? (to the second)

ANSWER: No — it wasn’t deterministic before, and WS was not “for sure” active before. The difference is that the lazy plugin/agent init makes the race much easier to hit, not that it creates a brand-new race that never existed.

1) Startup race (#1785 / “WS not subscribed yet”)

  • Was WS active before for sure? No.
  • Even before the plugin PR, RemoteConversation.__init__ started the WS client on a background thread and returned. There was always a window where the caller could immediately send_message() before the WS subscription finished.
  • So why did it “work before”? It likely “worked” most of the time because fewer (or later) events were emitted immediately during the first send_message(). If the earliest events happened a bit later, WS had time to subscribe. That’s probabilistic, not guaranteed.

The plugin PR (#1651) shifts more initialization into the first user interaction, so send_message() tends to trigger an immediate burst of events (e.g., SystemPrompt), which increases the probability that those events happen during the not-yet-subscribed window.

2) Shutdown/disconnect race (“WS closed but server still sending”)

  • Was WS closed normally with all events delivered before? Not for sure.
  • The fundamental issue here is also non-determinism: the client may close once it observes a “finished” state, but the server can still be emitting trailing events concurrently, and ASGI/websocket close/send ordering can race.
  • This kind of bug may have existed before too; it becomes more visible when the server does extra work around boundaries (plugins/hooks/extra events) or the environment is slower (CI), creating more chances for late events to overlap with disconnect.

Bottom line

  • Neither behavior was ever guaranteed by design.
  • The plugin PR likely didn’t make it deterministic before vs after; it changed timing so the races became reproducible/flaky in CI.
  • The “correct” fix direction is to stop relying on timing: add an explicit subscription handshake + resume/reconciliation (cursor or triggered REST sync), and define a terminal condition that guarantees “no more events will be emitted past event X” if you want clean shutdown semantics.

@xingyaoww
Copy link
Collaborator Author

@OpenHands /codereview-roasted on this PR and tell me, did we resolve this:

The “correct” fix direction is to stop relying on timing: add an explicit subscription handshake + resume/reconciliation (cursor or triggered REST sync), and define a terminal condition that guarantees “no more events will be emitted past event X” if you want clean shutdown semantics.

If not, let's do that with the minimal possible fix

@openhands-ai
Copy link

openhands-ai bot commented Jan 23, 2026

I'm on it! xingyaoww can track my progress at all-hands.dev

This addresses the 'correct' fix direction from the code review:
- Adds explicit subscription handshake (already in PR)
- Adds resume/reconciliation via REST sync after WebSocket ready

The reconcile() method fetches all events from the server and merges
them with the local cache, deduplicating by event ID. This ensures
no events are missed due to race conditions between the initial REST
sync and WebSocket subscription completion.

The reconciliation is called automatically after wait_until_ready()
returns in RemoteConversation.__init__.

Co-authored-by: openhands <openhands@all-hands.dev>
@openhands-ai
Copy link

openhands-ai bot commented Jan 23, 2026

Summary

Code Review Analysis

Did the PR resolve the "correct" fix direction?

The original PR partially addressed the issue:

  • Subscription handshake: Implemented using ConversationStateUpdateEvent as the ready signal
  • Resume/reconciliation: NOT implemented - events emitted between initial REST sync and WebSocket subscription could be lost
  • Terminal condition for clean shutdown: NOT addressed (separate issue)

Fix Implemented

I added the reconciliation component to complete the fix:

  1. Added reconcile() method to RemoteEventsList - Fetches all events from the server via REST and merges them with the local cache, deduplicating by event ID

  2. Called reconcile() after wait_until_ready() - In RemoteConversation.__init__, after the WebSocket subscription is confirmed ready, we do a REST sync to catch any events that might have been emitted during the subscription window

  3. Added 4 tests for reconciliation:

    • test_reconcile_method_exists
    • test_reconcile_merges_events_without_duplicates
    • test_reconcile_handles_empty_server_response
    • test_reconcile_is_idempotent

How it works now

The subscription flow is:

  1. Initial REST sync (fetch existing events)
  2. Start WebSocket thread
  3. Wait for ConversationStateUpdateEvent (subscription handshake)
  4. NEW: Reconciliation REST sync (catch any events missed during steps 1-3)
  5. Constructor returns

This ensures no events are lost due to the race condition between REST sync and WebSocket subscription.

What's NOT addressed

The terminal condition for clean shutdown (events lost during client disconnection) is a separate issue that would require more significant changes, potentially including:

  • Server-side "terminal" event signaling
  • Client-side final REST sync before close
  • Or graceful WebSocket shutdown protocol

This is tracked separately and not part of this minimal fix.

Changes pushed

Commit eedc44a9 pushed to origin/openhands/fix-websocket-subscription-ready

PR: #1791

View full conversation

@openhands-ai
Copy link

openhands-ai bot commented Jan 23, 2026

Looks like there are a few issues preventing this PR from being merged!

  • GitHub Actions are failing:
    • Run tests

If you'd like me to help, just leave a comment, like

@OpenHands please fix the failing actions on PR #1791 at branch `openhands/fix-websocket-subscription-ready`

Feel free to include any additional details that might help me get this PR into a better state.

You can manage your notification settings

Copy link
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix for the WebSocket subscription race condition! The approach is solid, but I found a few areas that could be improved. See inline comments for details.

Comment on lines +142 to +150
start_time = time.time()
while True:
if self._ready.is_set():
return True
if self._stop.is_set():
return False
if timeout is not None and (time.time() - start_time) >= timeout:
return False
time.sleep(0.05) # Check every 50ms
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: The polling implementation with time.sleep(0.05) is inefficient and can add up to 50ms latency. Consider using threading.Event.wait(timeout) instead, which blocks efficiently and wakes up immediately when the event is set:

Suggested change
start_time = time.time()
while True:
if self._ready.is_set():
return True
if self._stop.is_set():
return False
if timeout is not None and (time.time() - start_time) >= timeout:
return False
time.sleep(0.05) # Check every 50ms
deadline = None if timeout is None else time.time() + timeout
while True:
# Calculate remaining timeout
if deadline is not None:
remaining = deadline - time.time()
if remaining <= 0:
return False
wait_timeout = min(0.05, remaining)
else:
wait_timeout = 0.05
# Wait efficiently using Event.wait() instead of sleep
if self._ready.wait(timeout=wait_timeout):
return True

This approach is more efficient because Event.wait() uses the underlying OS primitives to block the thread, rather than busy-polling every 50ms.

Comment on lines +296 to +299
with self._lock:
if event.id not in self._cached_event_ids:
self.add_event(event)
added_count += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: The code holds self._lock while calling add_event(), which also acquires the same lock. While this works with RLock (reentrant lock), it creates nested locking which is harder to reason about and maintain.

Consider refactoring to avoid the nested lock acquisition:

Suggested change
with self._lock:
if event.id not in self._cached_event_ids:
self.add_event(event)
added_count += 1
for event in events:
# Check outside the add_event lock
if event.id not in self._cached_event_ids:
# add_event will acquire lock and check again (race-safe)
if self.add_event(event): # Modify add_event to return bool
added_count += 1

Or better yet, extract the insertion logic into a private method _add_event_unsafe() that assumes the lock is already held, and have add_event() acquire the lock and call it. Then reconcile() can acquire the lock once and call _add_event_unsafe() for each event.

conv.close()


@pytest.mark.flaky_race_condition
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: This test is marked as @pytest.mark.flaky_race_condition but is not skipped in CI configuration. This will cause intermittent CI failures when the race condition occurs.

The marker is defined in pyproject.toml but there is no configuration to skip tests with this marker. You should either:

  1. Skip the test until the underlying issue is fixed:
Suggested change
@pytest.mark.flaky_race_condition
@pytest.mark.skip(
reason="Flaky: events can be lost during WebSocket disconnect. "
"See PR #1791 review. Test demonstrates known issue but should not block CI."
)
  1. Configure pytest to skip flaky tests in CI by adding to pyproject.toml:
[tool.pytest.ini_options]
addopts = "-v --tb=short -m 'not flaky_race_condition'"
  1. Fix the underlying issue so the test is reliable (preferred long-term solution)

Option 1 is recommended for now to prevent CI failures while the disconnect race condition is addressed separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RemoteEventsList may miss events emitted before WebSocket subscription completes

6 participants