From e98c2a1262fe7e96aec433bb375f7b81f5be3e8c Mon Sep 17 00:00:00 2001 From: evgeny Date: Mon, 15 Dec 2025 16:28:40 +0000 Subject: [PATCH] fix: make `queueMessages` client option True by default - Add TO3g test verifying queueMessages defaults to true - Add RTL6c2 check to fail immediately when queueMessages is false and connection is CONNECTING/DISCONNECTED - Add test for publish failure on CONNECTING state with queueMessages=false --- ably/realtime/connectionmanager.py | 17 ++++++++++++--- ably/realtime/realtime_channel.py | 1 + ably/types/options.py | 2 +- .../realtime/realtimechannel_publish_test.py | 21 +++++++++++++++++++ test/ably/realtime/realtimeconnection_test.py | 9 ++++++++ 5 files changed, 46 insertions(+), 4 deletions(-) diff --git a/ably/realtime/connectionmanager.py b/ably/realtime/connectionmanager.py index 79f89f28..d555bb9b 100644 --- a/ably/realtime/connectionmanager.py +++ b/ably/realtime/connectionmanager.py @@ -182,8 +182,19 @@ async def send_protocol_message(self, protocol_message: dict) -> None: Returns: None """ - if self.state not in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING, ConnectionState.CONNECTED): - raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000) + state_should_queue = (self.state in + (ConnectionState.INITIALIZED, ConnectionState.DISCONNECTED, ConnectionState.CONNECTING)) + + if self.state != ConnectionState.CONNECTED and not state_should_queue: + raise AblyException(f"Cannot send message while connection is {self.state}", 400, 90000) + + # RTL6c2: If queueMessages is false, fail immediately when not CONNECTED + if state_should_queue and not self.options.queue_messages: + raise AblyException( + f"Cannot send message while connection is {self.state}, and queue_messages is false", + 400, + 90000, + ) pending_message = PendingMessage(protocol_message) @@ -194,7 +205,7 @@ async def send_protocol_message(self, protocol_message: dict) -> None: self.pending_message_queue.push(pending_message) self.msg_serial += 1 - if self.state in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING): + if state_should_queue: self.queued_messages.appendleft(pending_message) if pending_message.ack_required: await pending_message.future diff --git a/ably/realtime/realtime_channel.py b/ably/realtime/realtime_channel.py index 7c6ce6de..f75b8129 100644 --- a/ably/realtime/realtime_channel.py +++ b/ably/realtime/realtime_channel.py @@ -498,6 +498,7 @@ def _throw_if_unpublishable_state(self) -> None: # RTL6c4: Check connection state connection_state = self.__realtime.connection.state if connection_state not in [ + ConnectionState.INITIALIZED, ConnectionState.CONNECTED, ConnectionState.CONNECTING, ConnectionState.DISCONNECTED, diff --git a/ably/types/options.py b/ably/types/options.py index 6990a4b7..f15b3656 100644 --- a/ably/types/options.py +++ b/ably/types/options.py @@ -26,7 +26,7 @@ def decode(self, delta: bytes, base: bytes) -> bytes: class Options(AuthOptions): def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0, - tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None, + tls_port=0, use_binary_protocol=True, queue_messages=True, recover=False, environment=None, http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None, http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None, fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None, diff --git a/test/ably/realtime/realtimechannel_publish_test.py b/test/ably/realtime/realtimechannel_publish_test.py index 7c32c1e2..5ace3eb2 100644 --- a/test/ably/realtime/realtimechannel_publish_test.py +++ b/test/ably/realtime/realtimechannel_publish_test.py @@ -285,6 +285,27 @@ async def check_disconnected(): await ably.close() + async def test_publish_fails_on_initialized_when_queue_messages_false(self): + """RTN7d: Verify publish fails immediately when connection is CONNECTING and queueMessages=false""" + # Create client with queueMessages=False + ably = await TestApp.get_ably_realtime( + use_binary_protocol=self.use_binary_protocol, + queue_messages=False, + auto_connect=False + ) + + channel = ably.channels.get('test_initialized_channel') + + # Try to publish while in the INITIALIZED state with queueMessages=false + with pytest.raises(AblyException) as exc_info: + await channel.publish('test_event', 'test_data') + + # Verify it failed with appropriate error + assert exc_info.value.code == 90000 + assert exc_info.value.status_code == 400 + + await ably.close() + # RTN19a2 - Reset msgSerial on new connectionId async def test_msgserial_resets_on_new_connection_id(self): """RTN19a2: Verify msgSerial resets to 0 when connectionId changes""" diff --git a/test/ably/realtime/realtimeconnection_test.py b/test/ably/realtime/realtimeconnection_test.py index 68ffb6dd..76e52e43 100644 --- a/test/ably/realtime/realtimeconnection_test.py +++ b/test/ably/realtime/realtimeconnection_test.py @@ -460,3 +460,12 @@ def intercepted_websocket_frame(data): assert all(isinstance(frame, str) for frame in received_raw_websocket_frames) await ably.close() + + # TO3g + async def test_queue_messages_defaults_to_true(self): + """TO3g: Verify that queueMessages client option defaults to true""" + ably = await TestApp.get_ably_realtime(auto_connect=False) + + # TO3g: queueMessages defaults to true + assert ably.options.queue_messages is True + assert ably.connection.connection_manager.options.queue_messages is True