@@ -516,38 +516,42 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
516
516
return result
517
517
518
518
def disconnect (self ):
519
- """Disconnects the MiniMQTT client from the MQTT broker.
520
- """
519
+ """Disconnects the MiniMQTT client from the MQTT broker."""
521
520
self .is_connected ()
522
- if self .logger :
521
+ if self .logger is not None :
523
522
self .logger .debug ("Sending DISCONNECT packet to broker" )
524
- self ._sock .send (MQTT_DISCONNECT )
525
- if self .logger :
523
+ try :
524
+ self ._sock .send (MQTT_DISCONNECT )
525
+ except RuntimeError as e :
526
+ if self .logger :
527
+ self .logger .warning ("Unable to send DISCONNECT packet: {}" .format (e ))
528
+ if self .logger is not None :
526
529
self .logger .debug ("Closing socket" )
527
- self ._free_sockets ()
530
+ self ._sock . close ()
528
531
self ._is_connected = False
529
- self ._subscribed_topics = None
532
+ self ._subscribed_topics = []
530
533
if self .on_disconnect is not None :
531
- self .on_disconnect (self , self ._user_data , 0 )
534
+ self .on_disconnect (self , self .user_data , 0 )
532
535
533
536
def ping (self ):
534
537
"""Pings the MQTT Broker to confirm if the broker is alive or if
535
538
there is an active network connection.
539
+ Returns response codes of any messages received while waiting for PINGRESP.
536
540
"""
537
541
self .is_connected ()
538
- buf = self ._rx_buffer
539
542
if self .logger :
540
543
self .logger .debug ("Sending PINGREQ" )
541
544
self ._sock .send (MQTT_PINGREQ )
542
- if self .logger :
543
- self .logger .debug ("Checking PINGRESP" )
544
- while True :
545
- op = self ._wait_for_msg ()
546
- if op == 208 :
547
- self ._recv_into (buf , 2 )
548
- if buf [0 ] != 0x00 :
549
- raise MMQTTException ("PINGRESP not returned from broker." )
550
- return
545
+ ping_timeout = self .keep_alive
546
+ stamp = time .monotonic ()
547
+ rc , rcs = None , []
548
+ while rc != MQTT_PINGRESP :
549
+ rc = self ._wait_for_msg ()
550
+ if rc :
551
+ rcs .append (rc )
552
+ if time .monotonic () - stamp > ping_timeout :
553
+ raise MMQTTException ("PINGRESP not returned from broker." )
554
+ return rcs
551
555
552
556
# pylint: disable=too-many-branches, too-many-statements
553
557
def publish (self , topic , msg , retain = False , qos = 0 ):
@@ -794,12 +798,12 @@ def unsubscribe(self, topic):
794
798
while True :
795
799
op = self ._wait_for_msg ()
796
800
if op == 176 :
797
- self ._recv_into ( buf , 3 )
798
- assert buf [0 ] == 0x02
801
+ rc = self ._sock_exact_recv ( 3 )
802
+ assert rc [0 ] == 0x02
799
803
# [MQTT-3.32]
800
804
assert (
801
- buf [1 ] == packet_id_bytes [0 ]
802
- and buf [2 ] == packet_id_bytes [1 ]
805
+ rc [1 ] == packet_id_bytes [0 ]
806
+ and rc [2 ] == packet_id_bytes [1 ]
803
807
)
804
808
for t in topics :
805
809
if self .on_unsubscribe is not None :
@@ -828,33 +832,36 @@ def reconnect(self, resub_topics=True):
828
832
feed = subscribed_topics .pop ()
829
833
self .subscribe (feed )
830
834
831
- def loop (self , timeout = 0.01 ):
835
+ def loop (self , timeout = 1 ):
832
836
"""Non-blocking message loop. Use this method to
833
837
check incoming subscription messages.
834
- :param float timeout: Set timeout in seconds for
835
- polling the message queue.
838
+ Returns response codes of any messages received.
839
+ :param int timeout: Socket timeout, in seconds.
840
+
836
841
"""
837
842
if self ._timestamp == 0 :
838
843
self ._timestamp = time .monotonic ()
839
844
current_time = time .monotonic ()
840
845
if current_time - self ._timestamp >= self .keep_alive :
841
846
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
842
- if self .logger :
847
+ if self .logger is not None :
843
848
self .logger .debug (
844
849
"KeepAlive period elapsed - \
845
850
requesting a PINGRESP from the server..."
846
851
)
847
- self .ping ()
852
+ rcs = self .ping ()
848
853
self ._timestamp = 0
849
- return self ._wait_for_msg (timeout )
854
+ return rcs
855
+ self ._sock .settimeout (timeout )
856
+ rc = self ._wait_for_msg ()
857
+ return [rc ] if rc else None
858
+
850
859
851
860
def _wait_for_msg (self , timeout = 0.1 ):
852
861
"""Reads and processes network events."""
853
862
buf = self ._rx_buffer
854
863
res = bytearray (1 )
855
864
856
- # Attempt to read
857
- self ._sock .settimeout (1 )
858
865
try :
859
866
self ._sock .recv_into (res , 1 )
860
867
except OSError as error :
0 commit comments