Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions ably/realtime/connectionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,19 @@ async def send_protocol_message(self, protocol_message: dict) -> None:
Returns:
None
"""
if self.state not in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING, ConnectionState.CONNECTED):
raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000)
state_should_queue = (self.state in
(ConnectionState.INITIALIZED, ConnectionState.DISCONNECTED, ConnectionState.CONNECTING))

if self.state != ConnectionState.CONNECTED and not state_should_queue:
raise AblyException(f"Cannot send message while connection is {self.state}", 400, 90000)

# RTL6c2: If queueMessages is false, fail immediately when not CONNECTED
if state_should_queue and not self.options.queue_messages:
raise AblyException(
f"Cannot send message while connection is {self.state}, and queue_messages is false",
400,
90000,
)

pending_message = PendingMessage(protocol_message)

Expand All @@ -194,7 +205,7 @@ async def send_protocol_message(self, protocol_message: dict) -> None:
self.pending_message_queue.push(pending_message)
self.msg_serial += 1

if self.state in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING):
if state_should_queue:
self.queued_messages.appendleft(pending_message)
if pending_message.ack_required:
await pending_message.future
Expand Down
1 change: 1 addition & 0 deletions ably/realtime/realtime_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ def _throw_if_unpublishable_state(self) -> None:
# RTL6c4: Check connection state
connection_state = self.__realtime.connection.state
if connection_state not in [
ConnectionState.INITIALIZED,
ConnectionState.CONNECTED,
ConnectionState.CONNECTING,
ConnectionState.DISCONNECTED,
Expand Down
2 changes: 1 addition & 1 deletion ably/types/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def decode(self, delta: bytes, base: bytes) -> bytes:

class Options(AuthOptions):
def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0,
tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None,
tls_port=0, use_binary_protocol=True, queue_messages=True, recover=False, environment=None,
http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None,
http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None,
fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None,
Expand Down
21 changes: 21 additions & 0 deletions test/ably/realtime/realtimechannel_publish_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,27 @@ async def check_disconnected():

await ably.close()

async def test_publish_fails_on_initialized_when_queue_messages_false(self):
"""RTN7d: Verify publish fails immediately when connection is CONNECTING and queueMessages=false"""
# Create client with queueMessages=False
ably = await TestApp.get_ably_realtime(
use_binary_protocol=self.use_binary_protocol,
queue_messages=False,
auto_connect=False
)

channel = ably.channels.get('test_initialized_channel')

# Try to publish while in the INITIALIZED state with queueMessages=false
with pytest.raises(AblyException) as exc_info:
await channel.publish('test_event', 'test_data')

# Verify it failed with appropriate error
assert exc_info.value.code == 90000
assert exc_info.value.status_code == 400

await ably.close()

# RTN19a2 - Reset msgSerial on new connectionId
async def test_msgserial_resets_on_new_connection_id(self):
"""RTN19a2: Verify msgSerial resets to 0 when connectionId changes"""
Expand Down
9 changes: 9 additions & 0 deletions test/ably/realtime/realtimeconnection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,12 @@ def intercepted_websocket_frame(data):
assert all(isinstance(frame, str) for frame in received_raw_websocket_frames)

await ably.close()

# TO3g
async def test_queue_messages_defaults_to_true(self):
"""TO3g: Verify that queueMessages client option defaults to true"""
ably = await TestApp.get_ably_realtime(auto_connect=False)

# TO3g: queueMessages defaults to true
assert ably.options.queue_messages is True
assert ably.connection.connection_manager.options.queue_messages is True
Loading