17
17
import aiomqtt
18
18
from aiomqtt import MqttError , TLSParameters
19
19
20
- from .. import RoborockException
21
- from .session import MqttParams , MqttSession
20
+ from .session import MqttParams , MqttSession , MqttSessionException
22
21
23
22
_LOGGER = logging .getLogger (__name__ )
24
23
_MQTT_LOGGER = logging .getLogger (f"{ __name__ } .aiomqtt" )
@@ -71,7 +70,14 @@ async def start(self) -> None:
71
70
start_future : asyncio .Future [None ] = asyncio .Future ()
72
71
loop = asyncio .get_event_loop ()
73
72
self ._background_task = loop .create_task (self ._run_task (start_future ))
74
- await start_future
73
+ try :
74
+ await start_future
75
+ except MqttError as err :
76
+ raise MqttSessionException (f"Error starting MQTT session: { err } " ) from err
77
+ except Exception as err :
78
+ raise MqttSessionException (f"Unexpected error starting session: { err } " ) from err
79
+ else :
80
+ _LOGGER .debug ("MQTT session started successfully" )
75
81
76
82
async def close (self ) -> None :
77
83
"""Cancels the MQTT loop and shutdown the client library."""
@@ -102,14 +108,18 @@ async def _run_task(self, start_future: asyncio.Future[None] | None) -> None:
102
108
103
109
await self ._process_message_loop (client )
104
110
105
- except asyncio .CancelledError :
106
- _LOGGER .debug ("MQTT loop was cancelled" )
107
- return
108
111
except MqttError as err :
109
- _LOGGER .info ("MQTT error: %s" , err )
110
112
if start_future :
113
+ _LOGGER .info ("MQTT error starting session: %s" , err )
111
114
start_future .set_exception (err )
112
115
return
116
+ _LOGGER .info ("MQTT error: %s" , err )
117
+ except asyncio .CancelledError as err :
118
+ if start_future :
119
+ _LOGGER .debug ("MQTT loop was cancelled" )
120
+ start_future .set_exception (err )
121
+ _LOGGER .debug ("MQTT loop was cancelled whiel starting" )
122
+ return
113
123
# Catch exceptions to avoid crashing the loop
114
124
# and to allow the loop to retry.
115
125
except Exception as err :
@@ -118,10 +128,11 @@ async def _run_task(self, start_future: asyncio.Future[None] | None) -> None:
118
128
if "generator didn't stop" in str (err ):
119
129
_LOGGER .debug ("MQTT loop was cancelled" )
120
130
return
121
- _LOGGER .error ("Uncaught error in MQTT session: %s" , err )
122
131
if start_future :
132
+ _LOGGER .error ("Uncaught error starting MQTT session: %s" , err )
123
133
start_future .set_exception (err )
124
134
return
135
+ _LOGGER .error ("Uncaught error during MQTT session: %s" , err )
125
136
126
137
self ._healthy = False
127
138
_LOGGER .info ("MQTT session disconnected, retrying in %s seconds" , self ._backoff .total_seconds ())
@@ -150,6 +161,8 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
150
161
self ._client = client
151
162
for topic in self ._listeners :
152
163
_LOGGER .debug ("Re-establising subscription to topic %s" , topic )
164
+ # TODO: If this fails it will break the whole connection. Make
165
+ # this retry again in the background with backoff.
153
166
await client .subscribe (topic )
154
167
155
168
yield client
@@ -158,10 +171,11 @@ async def _mqtt_client(self, params: MqttParams) -> aiomqtt.Client:
158
171
self ._client = None
159
172
160
173
async def _process_message_loop (self , client : aiomqtt .Client ) -> None :
161
- _LOGGER .debug ("Processing MQTT messages" )
174
+ _LOGGER .debug ("client=%s" , client )
175
+ _LOGGER .debug ("Processing MQTT messages: %s" , client .messages )
162
176
async for message in client .messages :
163
177
_LOGGER .debug ("Received message: %s" , message )
164
- for listener in self ._listeners .get (message .topic .value ) or [] :
178
+ for listener in self ._listeners .get (message .topic .value , []) :
165
179
try :
166
180
listener (message .payload )
167
181
except asyncio .CancelledError :
@@ -185,7 +199,10 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
185
199
async with self ._client_lock :
186
200
if self ._client :
187
201
_LOGGER .debug ("Establishing subscription to topic %s" , topic )
188
- await self ._client .subscribe (topic )
202
+ try :
203
+ await self ._client .subscribe (topic )
204
+ except MqttError as err :
205
+ raise MqttSessionException (f"Error subscribing to topic: { err } " ) from err
189
206
else :
190
207
_LOGGER .debug ("Client not connected, will establish subscription later" )
191
208
@@ -194,11 +211,15 @@ async def subscribe(self, topic: str, callback: Callable[[bytes], None]) -> Call
194
211
async def publish (self , topic : str , message : bytes ) -> None :
195
212
"""Publish a message on the topic."""
196
213
_LOGGER .debug ("Sending message to topic %s: %s" , topic , message )
214
+ client : aiomqtt .Client
197
215
async with self ._client_lock :
198
- if not self ._client :
199
- raise RoborockException ("MQTT client not connected" )
200
- coro = self ._client .publish (topic , message )
201
- await coro
216
+ if self ._client is None :
217
+ raise MqttSessionException ("Could not publish message, MQTT client not connected" )
218
+ client = self ._client
219
+ try :
220
+ await client .publish (topic , message )
221
+ except MqttError as err :
222
+ raise MqttSessionException (f"Error publishing message: { err } " ) from err
202
223
203
224
204
225
async def create_mqtt_session (params : MqttParams ) -> MqttSession :
0 commit comments