1717import aiomqtt
1818from aiomqtt import MqttError , TLSParameters
1919
20- from .. import RoborockException
21- from .session import MqttParams , MqttSession
20+ from .session import MqttParams , MqttSession , MqttSessionException
2221
2322_LOGGER = logging .getLogger (__name__ )
2423_MQTT_LOGGER = logging .getLogger (f"{ __name__ } .aiomqtt" )
@@ -71,7 +70,14 @@ async def start(self) -> None:
7170 start_future : asyncio .Future [None ] = asyncio .Future ()
7271 loop = asyncio .get_event_loop ()
7372 self ._background_task = loop .create_task (self ._run_task (start_future ))
74- await start_future
73+ try :
74+ await start_future
75+ except MqttError as err :
76+ raise MqttSessionException (f"Error starting MQTT session: { err } " ) from err
77+ except Exception as err :
78+ raise MqttSessionException (f"Unexpected error starting session: { err } " ) from err
79+ else :
80+ _LOGGER .debug ("MQTT session started successfully" )
7581
7682 async def close (self ) -> None :
7783 """Cancels the MQTT loop and shutdown the client library."""
@@ -102,14 +108,18 @@ async def _run_task(self, start_future: asyncio.Future[None] | None) -> None:
102108
103109 await self ._process_message_loop (client )
104110
105- except asyncio .CancelledError :
106- _LOGGER .debug ("MQTT loop was cancelled" )
107- return
108111 except MqttError as err :
109- _LOGGER .info ("MQTT error: %s" , err )
110112 if start_future :
113+ _LOGGER .info ("MQTT error starting session: %s" , err )
111114 start_future .set_exception (err )
112115 return
116+ _LOGGER .info ("MQTT error: %s" , err )
117+ except asyncio .CancelledError as err :
118+ if start_future :
119+ _LOGGER .debug ("MQTT loop was cancelled" )
120+ start_future .set_exception (err )
121+ _LOGGER .debug ("MQTT loop was cancelled whiel starting" )
122+ return
113123 # Catch exceptions to avoid crashing the loop
114124 # and to allow the loop to retry.
115125 except Exception as err :
@@ -118,10 +128,11 @@ async def _run_task(self, start_future: asyncio.Future[None] | None) -> None:
118128 if "generator didn't stop" in str (err ):
119129 _LOGGER .debug ("MQTT loop was cancelled" )
120130 return
121- _LOGGER .error ("Uncaught error in MQTT session: %s" , err )
122131 if start_future :
132+ _LOGGER .error ("Uncaught error starting MQTT session: %s" , err )
123133 start_future .set_exception (err )
124134 return
135+ _LOGGER .error ("Uncaught error during MQTT session: %s" , err )
125136
126137 self ._healthy = False
127138 _LOGGER .info ("MQTT session disconnected, retrying in %s seconds" , self ._backoff .total_seconds ())
@@ -150,6 +161,8 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
150161 self ._client = client
151162 for topic in self ._listeners :
152163 _LOGGER .debug ("Re-establising subscription to topic %s" , topic )
164+ # TODO: If this fails it will break the whole connection. Make
165+ # this retry again in the background with backoff.
153166 await client .subscribe (topic )
154167
155168 yield client
@@ -158,10 +171,11 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
158171 self ._client = None
159172
160173 async def _process_message_loop (self , client : aiomqtt .Client ) -> None :
161- _LOGGER .debug ("Processing MQTT messages" )
174+ _LOGGER .debug ("client=%s" , client )
175+ _LOGGER .debug ("Processing MQTT messages: %s" , client .messages )
162176 async for message in client .messages :
163177 _LOGGER .debug ("Received message: %s" , message )
164- for listener in self ._listeners .get (message .topic .value ) or [] :
178+ for listener in self ._listeners .get (message .topic .value , []) :
165179 try :
166180 listener (message .payload )
167181 except asyncio .CancelledError :
@@ -185,7 +199,10 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
185199 async with self ._client_lock :
186200 if self ._client :
187201 _LOGGER .debug ("Establishing subscription to topic %s" , topic )
188- await self ._client .subscribe (topic )
202+ try :
203+ await self ._client .subscribe (topic )
204+ except MqttError as err :
205+ raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
189206 else :
190207 _LOGGER .debug ("Client not connected, will establish subscription later" )
191208
@@ -194,11 +211,15 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
194211 async def publish (self , topic : str , message : bytes ) -> None :
195212 """Publish a message on the topic."""
196213 _LOGGER .debug ("Sending message to topic %s: %s" , topic , message )
214+ client : aiomqtt .Client
197215 async with self ._client_lock :
198- if not self ._client :
199- raise RoborockException ("MQTT client not connected" )
200- coro = self ._client .publish (topic , message )
201- await coro
216+ if self ._client is None :
217+ raise MqttSessionException ("Could not publish message, MQTT client not connected" )
218+ client = self ._client
219+ try :
220+ await client .publish (topic , message )
221+ except MqttError as err :
222+ raise MqttSessionException (f"Error publishing message: { err } " ) from err
202223
203224
204225async def create_mqtt_session (params : MqttParams ) -> MqttSession :
0 commit comments