1919
2020from roborock .callbacks import CallbackMap
2121
22+ from .health_manager import HealthManager
2223from .session import MqttParams , MqttSession , MqttSessionException , MqttSessionUnauthorized
2324
2425_LOGGER = logging .getLogger (__name__ )
@@ -69,17 +70,24 @@ def __init__(
6970 self ._stop = False
7071 self ._backoff = MIN_BACKOFF_INTERVAL
7172 self ._client : aiomqtt .Client | None = None
73+ self ._client_subscribed_topics : set [str ] = set ()
7274 self ._client_lock = asyncio .Lock ()
7375 self ._listeners : CallbackMap [str , bytes ] = CallbackMap (_LOGGER )
7476 self ._connection_task : asyncio .Task [None ] | None = None
7577 self ._topic_idle_timeout = topic_idle_timeout
7678 self ._idle_timers : dict [str , asyncio .Task [None ]] = {}
79+ self ._health_manager = HealthManager (self .restart )
7780
7881 @property
7982 def connected (self ) -> bool :
8083 """True if the session is connected to the broker."""
8184 return self ._healthy
8285
86+ @property
87+ def health_manager (self ) -> HealthManager :
88+ """Return the health manager for the session."""
89+ return self ._health_manager
90+
8391 async def start (self ) -> None :
8492 """Start the MQTT session.
8593
@@ -218,7 +226,7 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
218226 # Re-establish any existing subscriptions
219227 async with self ._client_lock :
220228 self ._client = client
221- for topic in self ._listeners . keys () :
229+ for topic in self ._client_subscribed_topics :
222230 _LOGGER .debug ("Re-establishing subscription to topic %s" , topic )
223231 # TODO: If this fails it will break the whole connection. Make
224232 # this retry again in the background with backoff.
@@ -249,32 +257,42 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
249257 unsub = self ._listeners .add_callback (topic , callback )
250258
251259 async with self ._client_lock :
252- if self ._client :
253- _LOGGER .debug ("Establishing subscription to topic %s" , topic )
254- try :
255- await self ._client .subscribe (topic )
256- except MqttError as err :
257- # Clean up the callback if subscription fails
258- unsub ()
259- raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
260- else :
261- _LOGGER .debug ("Client not connected, will establish subscription later" )
262-
263- def schedule_unsubscribe ():
260+ if topic not in self ._client_subscribed_topics :
261+ self ._client_subscribed_topics .add (topic )
262+ if self ._client :
263+ _LOGGER .debug ("Establishing subscription to topic %s" , topic )
264+ try :
265+ await self ._client .subscribe (topic )
266+ except MqttError as err :
267+ # Clean up the callback if subscription fails
268+ unsub ()
269+ self ._client_subscribed_topics .discard (topic )
270+ raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
271+ else :
272+ _LOGGER .debug ("Client not connected, will establish subscription later" )
273+
274+ def schedule_unsubscribe () -> None :
264275 async def idle_unsubscribe ():
265276 try :
266277 await asyncio .sleep (self ._topic_idle_timeout .total_seconds ())
267278 # Only unsubscribe if there are no callbacks left for this topic
268279 if not self ._listeners .get_callbacks (topic ):
269280 async with self ._client_lock :
281+ # Check again if we have listeners, in case a subscribe happened
282+ # while we were waiting for the lock or after we popped the timer.
283+ if self ._listeners .get_callbacks (topic ):
284+ _LOGGER .debug ("Skipping unsubscribe for %s, new listeners added" , topic )
285+ return
286+
287+ self ._idle_timers .pop (topic , None )
288+ self ._client_subscribed_topics .discard (topic )
289+
270290 if self ._client :
271291 _LOGGER .debug ("Idle timeout expired, unsubscribing from topic %s" , topic )
272292 try :
273293 await self ._client .unsubscribe (topic )
274294 except MqttError as err :
275295 _LOGGER .warning ("Error unsubscribing from topic %s: %s" , topic , err )
276- # Clean up timer from dict
277- self ._idle_timers .pop (topic , None )
278296 except asyncio .CancelledError :
279297 _LOGGER .debug ("Idle unsubscribe for topic %s cancelled" , topic )
280298
@@ -286,7 +304,10 @@ def delayed_unsub():
286304 unsub () # Remove the callback from CallbackMap
287305 # If no more callbacks for this topic, start idle timer
288306 if not self ._listeners .get_callbacks (topic ):
307+ _LOGGER .debug ("Unsubscribing topic %s, starting idle timer" , topic )
289308 schedule_unsubscribe ()
309+ else :
310+ _LOGGER .debug ("Unsubscribing topic %s, still have active callbacks" , topic )
290311
291312 return delayed_unsub
292313
@@ -323,6 +344,11 @@ def connected(self) -> bool:
323344 """True if the session is connected to the broker."""
324345 return self ._session .connected
325346
347+ @property
348+ def health_manager (self ) -> HealthManager :
349+ """Return the health manager for the session."""
350+ return self ._session .health_manager
351+
326352 async def _maybe_start (self ) -> None :
327353 """Start the MQTT session if not already started."""
328354 async with self ._lock :
0 commit comments