-
Notifications
You must be signed in to change notification settings - Fork 115
fix: ensure WebSocket is subscribed before operations #1791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Coverage Report •
|
||||||||||||||||||||
|
@jpshackelford can you check if this works? |
|
Absolutely. Thanks for looking at it. |
This fixes issue #1785 where RemoteEventsList could miss events emitted between conversation creation and WebSocket subscription completion. The fix adds a ready signaling mechanism to WebSocketCallbackClient: - Added _ready threading.Event to track subscription completion - Added wait_until_ready(timeout) method to block until ready - _client_loop sets _ready on first ConversationStateUpdateEvent - RemoteConversation.__init__ waits for ready before returning The server already sends a ConversationStateUpdateEvent immediately after subscription completes, so we use this as the 'ready' signal. This ensures events emitted during send_message() (like SystemPromptEvent from lazy agent initialization) are reliably delivered via WebSocket. Co-authored-by: openhands <openhands@all-hands.dev>
all-hands-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good fix for the WebSocket subscription race condition. Found one important issue with shutdown behavior and a few suggestions for improvement.
| Returns: | ||
| True if the WebSocket is ready, False if timeout expired. | ||
| """ | ||
| return self._ready.wait(timeout=timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 Important: wait_until_ready() should also check the _stop event to avoid blocking for the full timeout when the client is being stopped.
If stop() is called before the ready event is set, the WebSocket thread will exit but this method will continue blocking for the full timeout period (potentially 10 seconds). This creates poor UX during error scenarios or shutdown.
Consider checking both events:
| return self._ready.wait(timeout=timeout) | |
| def wait_until_ready(self, timeout: float | None = None) -> bool: | |
| """Wait for WebSocket subscription to complete. | |
| The server sends a ConversationStateUpdateEvent immediately after | |
| subscription completes. This method blocks until that event is received, | |
| the client is stopped, or the timeout expires. | |
| Args: | |
| timeout: Maximum time to wait in seconds. None means wait forever. | |
| Returns: | |
| True if the WebSocket is ready, False if stopped or timeout expired. | |
| """ | |
| import time | |
| start_time = time.time() | |
| while True: | |
| if self._ready.is_set(): | |
| return True | |
| if self._stop.is_set(): | |
| return False | |
| if timeout is not None and (time.time() - start_time) >= timeout: | |
| return False | |
| time.sleep(0.05) # Check every 50ms |
| # Wait for WebSocket subscription to complete before allowing operations. | ||
| # This ensures events emitted during send_message() are not missed. | ||
| # The server sends a ConversationStateUpdateEvent after subscription. | ||
| if not self._ws_client.wait_until_ready(timeout=10.0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Suggestion: Consider making this timeout configurable (e.g., via an environment variable or parameter) for environments with high latency or slow connections. 10 seconds is reasonable for most cases, but may not be sufficient in all scenarios.
| logger.warning( | ||
| f"WebSocket subscription did not complete within timeout for " | ||
| f"conversation {self._id}. Events may be missed." | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Suggestion: This warning could be more actionable. Consider adding guidance on what users should do or check when this occurs.
| logger.warning( | |
| f"WebSocket subscription did not complete within timeout for " | |
| f"conversation {self._id}. Events may be missed." | |
| ) | |
| logger.warning( | |
| f"WebSocket subscription did not complete within timeout for " | |
| f"conversation {self._id}. Events may be missed. " | |
| f"Check network connectivity and verify the server is responding." | |
| ) |
| def test_ready_not_set_for_non_state_update_events(self, mock_message_event): | ||
| """Test that non-ConversationStateUpdateEvent events don't trigger ready. | ||
| Note: The ready signal is set in _client_loop when processing messages, | ||
| not in the callback. This test verifies the callback doesn't set ready. | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 Nit: The docstring comment is a bit confusing. Since _client_loop is the only place that sets _ready, stating "this test verifies the callback doesn't set ready" implies callbacks might sometimes set it. Consider clarifying:
| def test_ready_not_set_for_non_state_update_events(self, mock_message_event): | |
| """Test that non-ConversationStateUpdateEvent events don't trigger ready. | |
| Note: The ready signal is set in _client_loop when processing messages, | |
| not in the callback. This test verifies the callback doesn't set ready. | |
| """ | |
| def test_ready_not_set_for_non_state_update_events(self, mock_message_event): | |
| """Test that directly invoking callbacks does not set the ready flag. | |
| The ready signal is only set by _client_loop when it processes a | |
| ConversationStateUpdateEvent from the WebSocket, not by callbacks. | |
| """ |
is fixed The WebSocket subscription race condition is now fixed by ensuring RemoteConversation waits for WebSocket subscription to complete before allowing operations. Re-enables: - test_remote_conversation_over_real_server Note: test_security_risk_field_with_live_server remains skipped due to a different issue - ActionEvent may be emitted after client starts closing the WebSocket, causing delivery failure. This is a separate bug from #1785 (subscription race during connect). The test should be fixed to use REST API for event verification instead of relying on WebSocket delivery. Co-authored-by: openhands <openhands@all-hands.dev>
6f64a5d to
87a2347
Compare
jpshackelford
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR #1791 Review: Testing Results and New Finding
Testing Performed
I re-enabled test_remote_conversation_over_real_server in my local environment was previously skipped due to #1785:
Results:
- ✅ 8 unit tests pass in
tests/sdk/conversation/remote/test_websocket_subscription_ready.py - ✅
test_remote_conversation_over_real_servernow passes - confirming the fix works for the subscription race condition
The fix correctly ensures that RemoteConversation waits for WebSocket subscription to complete before allowing operations, preventing events like SystemPromptEvent from being missed.
New Issue Discovered: Events Lost During Client Disconnection
While investigating, I found that test_security_risk_field_with_live_server still fails even with this fix. Investigation revealed a separate race condition at the other end of the WebSocket lifecycle:
Root Cause
Events emitted during client disconnection can be lost. The sequence is:
- Test runs conversation with a mocked
finishtool call - Server emits
ActionEvent+ObservationEvent conv.run()returns when status becomes "finished"- Client starts closing WebSocket
- Server tries to send
ActionEventbut WebSocket is already closing - Error:
RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'
Evidence
Debug output shows the ActionEvent is missing:
Events in conv.state.events:
0: ConversationStateUpdateEvent
1: SystemPromptEvent
2: MessageEvent
3: ConversationStateUpdateEvent
4: ObservationEvent - tool_name=finish ← We get this
5: ConversationStateUpdateEvent
Did NOT find ActionEvent with finish tool ← ActionEvent lost during disconnect!
Server logs confirm the delivery failure:
error_sending_event: ActionEvent(id='68d7fcc7...', source='agent', timestamp='...')
RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'
Proposed Fix Options
This PR fixes one end of the WebSocket lifecycle (wait until ready before operations). A symmetric fix is needed for the other end.
Note: The evidence shows events arriving out of order - ObservationEvent arrived but ActionEvent (which should come first) was lost. This points to async event publishing as the root cause.
Option A: Server-Side - Synchronous Event Publishing (Recommended)
Ensure the server sends events synchronously in order before emitting the next event:
- Server emits ActionEvent → waits for WebSocket send to complete
- Server emits ObservationEvent → waits for send to complete
- Server emits ConversationStateUpdateEvent (finished)
- Client receives all events in proper order
- run() returns with complete event history
Pros:
- Fixes the root cause (async publishing causing out-of-order/lost events)
- Events arrive correctly in real-time via WebSocket
- No extra REST call needed
- Maintains proper event ordering semantics
Cons:
- Requires server code changes
- May add slight latency to event publishing
- Doesn't handle client-initiated disconnects (client closes browser mid-conversation)
Option B: Client-Side - Automatic REST Sync After Completion
Add automatic REST API sync when completing/closing:
- Add a
sync()method toRemoteEventsListthat re-fetches events from REST API - Call
sync()automatically afterrun()completes to capture any missed events
Pros:
- Symmetric with
wait_until_ready()pattern - Works with any server version (backward compatible)
- Simple to implement
- Defensive - REST API is authoritative regardless of WebSocket issues
Cons:
- Adds latency after run() completes
- Events may still arrive out of order via WebSocket (sync fixes it after the fact)
- Doesn't address the root cause of async publishing
Question
Would you like to expand the scope of this PR to also fix the "sync before disconnect" issue? Or would you prefer to address it in a follow-on issue/PR?
I've updated the test file to:
- Re-enable
test_remote_conversation_over_real_server✅ - Keep
test_security_risk_field_with_live_serverskipped with an accurate reason describing this new issue
Let me know how you'd like to proceed!
|
@OpenHands can you reproduce the bug described here: #1791 (review)? If so, can you create a test that reproduce it, and push it to the PR. Do not fix it yet |
|
I'm on it! xingyaoww can track my progress at all-hands.dev |
This test reproduces the bug described in PR #1791 review where events emitted during client disconnection can be lost. The sequence is: 1. Test runs conversation with a mocked finish tool call 2. Server emits ActionEvent + ObservationEvent 3. conv.run() returns when status becomes 'finished' 4. Client starts closing WebSocket 5. Server tries to send ActionEvent but WebSocket is already closing The bug is a race condition that may not always reproduce. The test verifies that events delivered via WebSocket match events persisted on the server (fetched via REST API). If they don't match, the bug is demonstrated. Also adds flaky_race_condition pytest marker for timing-dependent tests. See PR #1791 review for details: #1791 (review) Co-authored-by: openhands <openhands@all-hands.dev>
|
I've successfully reproduced the bug described in PR #1791 review and created a test for it. Here's a summary: Bug DescriptionThe bug is a race condition where events can be lost during client disconnection:
This is a separate issue from #1785 (subscription race at startup). Changes Made1. New Test:
|
This implements Option A from PR #1791 review. Previously, wait_until_ready() would block for the full timeout even when stop() was called. This created poor UX during error scenarios or shutdown. Now wait_until_ready() checks both _ready and _stop events: - Returns True if _ready is set (WebSocket subscription completed) - Returns False if _stop is set (client is being stopped) - Returns False if timeout expires Also adds two new tests to verify the _stop event behavior: - test_wait_until_ready_returns_false_when_stopped - test_wait_until_ready_returns_false_when_stopped_during_wait Co-authored-by: openhands <openhands@all-hands.dev>
|
@OpenHands Understand this PR well. Read all comments. Investigate the code, make the project, and run the tests discussed here. Do a /codereview-roasted on this PR. Post your feedback as a github comment on the PR. Do not make modifications, but make sure you don’t answer me, post it as comment. You can use remote gh as a gh token. |
|
I'm on it! enyst can track my progress at all-hands.dev |
|
/codereview-roasted Taste rating: 🟡 Acceptable (fixes a real race) → but it papers over a protocol/ordering guarantee with client-side busy-waiting, and the tests are… enthusiastic. What I ran
Summary of the changeYou’re fixing #1785: events emitted between The PR adds a “subscription handshake”:
That’s a real bug and this probably makes the CI flake go away. [CRITICAL ISSUES][openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py] Blocking in
|
This comment was marked as duplicate.
This comment was marked as duplicate.
|
HUMAN QUESTION: We expect a LOT of reconnects. Double check this. Don’t we reset _ready when losing connection or maybe when reinitializing a connection? ANSWER: I double-checked the current implementation in
Reconnect behavior:
So today |
|
HUMAN QUESTION: You said: “ideally moved to an explicit connect()/start() step so construction doesn’t do I/O” Investigate deeply. How large of a change is that and what consequences? ANSWER: I investigated how What
|
|
HUMAN QUESTION: WDYT about the direction in which we should go, ideally, for good design of RemoteConversation ? ANSWER: I re-read the two problematic scenarios and dug into the recent plugin-loading PR (#1651, merged ~1 day ago) to understand why these races got exposed. Quick refresher: what goes wrong today
Is the plugin PR (#1651) likely involved?Yes — not because plugins “break websockets”, but because #1651 deliberately introduced more deferred work that emits events at times that are awkward for the client lifecycle. Key change in #1651: lazy plugin loading / lazy agent initialization on the server side.
Why that matters:
So: #1651 probably didn’t create the race, but it amplified it by changing when events are emitted. Direction for good
|
|
HUMAN QUESTION: Are you sure the server doesnt replay or resend events? Look for replay or for resend. ANSWER: I looked for replay/resend behavior in both the agent-server and the SDK client. Server DOES support resending/replay, but it’s opt-in and the SDK is not using it. Server-side (agent-server)In
If if resend_all:
async for event in page_iterator(event_service.search_events):
await _send_event(event, websocket)Independently of Client-side (SDK)In
It does not append Conclusion
So my earlier statement should be refined to: there is no automatic replay/resume mechanism used by the SDK today; the only replay feature is the opt-in |
|
HUMAN QUESTION: Check events/search endpoint too. ANSWER: I checked the REST Endpoint:
|
|
HUMAN QUESTION: Check consequences on RemoteEventsList with periodic/triggered REST sync, and tell me if/how do you think it fits in the overall design direction. ANSWER: I looked at how What
|
|
HUMAN QUESTION: Was it deterministically before the client started or not? Was ws active before for sure or not? (related to the first flakey test) Was it closed normally WITH all events in, or not? (to the second) ANSWER: No — it wasn’t deterministic before, and WS was not “for sure” active before. The difference is that the lazy plugin/agent init makes the race much easier to hit, not that it creates a brand-new race that never existed. 1) Startup race (#1785 / “WS not subscribed yet”)
The plugin PR (#1651) shifts more initialization into the first user interaction, so 2) Shutdown/disconnect race (“WS closed but server still sending”)
Bottom line
|
|
@OpenHands /codereview-roasted on this PR and tell me, did we resolve this:
If not, let's do that with the minimal possible fix |
|
I'm on it! xingyaoww can track my progress at all-hands.dev |
This addresses the 'correct' fix direction from the code review: - Adds explicit subscription handshake (already in PR) - Adds resume/reconciliation via REST sync after WebSocket ready The reconcile() method fetches all events from the server and merges them with the local cache, deduplicating by event ID. This ensures no events are missed due to race conditions between the initial REST sync and WebSocket subscription completion. The reconciliation is called automatically after wait_until_ready() returns in RemoteConversation.__init__. Co-authored-by: openhands <openhands@all-hands.dev>
SummaryCode Review AnalysisDid the PR resolve the "correct" fix direction? The original PR partially addressed the issue:
Fix ImplementedI added the reconciliation component to complete the fix:
How it works nowThe subscription flow is:
This ensures no events are lost due to the race condition between REST sync and WebSocket subscription. What's NOT addressedThe terminal condition for clean shutdown (events lost during client disconnection) is a separate issue that would require more significant changes, potentially including:
This is tracked separately and not part of this minimal fix. Changes pushedCommit PR: #1791 |
|
Looks like there are a few issues preventing this PR from being merged!
If you'd like me to help, just leave a comment, like Feel free to include any additional details that might help me get this PR into a better state. You can manage your notification settings |
all-hands-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good fix for the WebSocket subscription race condition! The approach is solid, but I found a few areas that could be improved. See inline comments for details.
| start_time = time.time() | ||
| while True: | ||
| if self._ready.is_set(): | ||
| return True | ||
| if self._stop.is_set(): | ||
| return False | ||
| if timeout is not None and (time.time() - start_time) >= timeout: | ||
| return False | ||
| time.sleep(0.05) # Check every 50ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Suggestion: The polling implementation with time.sleep(0.05) is inefficient and can add up to 50ms latency. Consider using threading.Event.wait(timeout) instead, which blocks efficiently and wakes up immediately when the event is set:
| start_time = time.time() | |
| while True: | |
| if self._ready.is_set(): | |
| return True | |
| if self._stop.is_set(): | |
| return False | |
| if timeout is not None and (time.time() - start_time) >= timeout: | |
| return False | |
| time.sleep(0.05) # Check every 50ms | |
| deadline = None if timeout is None else time.time() + timeout | |
| while True: | |
| # Calculate remaining timeout | |
| if deadline is not None: | |
| remaining = deadline - time.time() | |
| if remaining <= 0: | |
| return False | |
| wait_timeout = min(0.05, remaining) | |
| else: | |
| wait_timeout = 0.05 | |
| # Wait efficiently using Event.wait() instead of sleep | |
| if self._ready.wait(timeout=wait_timeout): | |
| return True |
This approach is more efficient because Event.wait() uses the underlying OS primitives to block the thread, rather than busy-polling every 50ms.
| with self._lock: | ||
| if event.id not in self._cached_event_ids: | ||
| self.add_event(event) | ||
| added_count += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Suggestion: The code holds self._lock while calling add_event(), which also acquires the same lock. While this works with RLock (reentrant lock), it creates nested locking which is harder to reason about and maintain.
Consider refactoring to avoid the nested lock acquisition:
| with self._lock: | |
| if event.id not in self._cached_event_ids: | |
| self.add_event(event) | |
| added_count += 1 | |
| for event in events: | |
| # Check outside the add_event lock | |
| if event.id not in self._cached_event_ids: | |
| # add_event will acquire lock and check again (race-safe) | |
| if self.add_event(event): # Modify add_event to return bool | |
| added_count += 1 |
Or better yet, extract the insertion logic into a private method _add_event_unsafe() that assumes the lock is already held, and have add_event() acquire the lock and call it. Then reconcile() can acquire the lock once and call _add_event_unsafe() for each event.
| conv.close() | ||
|
|
||
|
|
||
| @pytest.mark.flaky_race_condition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 Important: This test is marked as @pytest.mark.flaky_race_condition but is not skipped in CI configuration. This will cause intermittent CI failures when the race condition occurs.
The marker is defined in pyproject.toml but there is no configuration to skip tests with this marker. You should either:
- Skip the test until the underlying issue is fixed:
| @pytest.mark.flaky_race_condition | |
| @pytest.mark.skip( | |
| reason="Flaky: events can be lost during WebSocket disconnect. " | |
| "See PR #1791 review. Test demonstrates known issue but should not block CI." | |
| ) |
- Configure pytest to skip flaky tests in CI by adding to
pyproject.toml:
[tool.pytest.ini_options]
addopts = "-v --tb=short -m 'not flaky_race_condition'"- Fix the underlying issue so the test is reliable (preferred long-term solution)
Option 1 is recommended for now to prevent CI failures while the disconnect race condition is addressed separately.
Summary
This PR fixes issue #1785 where
RemoteEventsListcould miss events (likeSystemPromptEvent) that are emitted between conversation creation and WebSocket subscription completion.Problem
When
RemoteConversationis created:RemoteEventsList._do_full_sync()fetches existing events via REST (none exist yet if agent not initialized)WebSocketCallbackClient.start()starts a daemon thread to connect WebSocket (non-blocking)If
send_message()is called before the WebSocket subscription completes, events emitted during that call (likeSystemPromptEventfrom lazy agent initialization) are:state.events_pub_subwith zero subscribers (WebSocket not yet subscribed)RemoteEventsListThis caused
test_remote_conversation_over_real_serverto be flaky - passing locally but failing in CI due to timing differences.Solution
Implemented the "WebSocket Subscription Handshake" approach as proposed in #1785 comment:
_readythreading.Event toWebSocketCallbackClient- Tracks subscription completionwait_until_ready(timeout)method - Blocks until ready or timeout_client_loopsets_readyon firstConversationStateUpdateEvent- The server already sends this immediately after subscription completesRemoteConversation.__init__waits for ready - Ensures WebSocket is subscribed before returningKey insight
The server already sends a
ConversationStateUpdateEventimmediately aftersubscribe_to_events()completes (seeevent_service.pylines 330-353). We use this as the "ready" signal without requiring any protocol changes.Changes
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py:_readythreading.Event toWebSocketCallbackClientwait_until_ready(timeout)method_client_loopto set_readyon firstConversationStateUpdateEventRemoteConversation.__init__to wait for ready with 10s timeouttests/sdk/conversation/remote/test_websocket_subscription_ready.py:Testing
test_remote_conversation_over_real_serverpasses reliablyFixes #1785
@xingyaoww can click here to continue refining the PR
Agent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.12-nodejs22golang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:c406701-pythonRun
All tags pushed for this build
About Multi-Architecture Support
c406701-python) is a multi-arch manifest supporting both amd64 and arm64c406701-python-amd64) are also available if needed