@@ -273,7 +273,8 @@ def _get_socket(self, host, port, *, timeout=1):
273
273
274
274
connect_host = addr_info [- 1 ][0 ]
275
275
if port == 8883 :
276
- sock = self ._ssl_context .wrap_socket (sock , server_hostname = host )
276
+ sock = self ._ssl_context .wrap_socket (sock ,
277
+ server_hostname = host )
277
278
connect_host = host
278
279
sock .settimeout (timeout )
279
280
@@ -796,9 +797,11 @@ def reconnect(self, resub_topics=True):
796
797
feed = subscribed_topics .pop ()
797
798
self .subscribe (feed )
798
799
799
- def loop (self ):
800
+ def loop (self , timeout = 0.01 ):
800
801
"""Non-blocking message loop. Use this method to
801
802
check incoming subscription messages.
803
+ :param float timeout: Set timeout in seconds for
804
+ polling the message queue.
802
805
"""
803
806
if self ._timestamp == 0 :
804
807
self ._timestamp = time .monotonic ()
@@ -812,17 +815,20 @@ def loop(self):
812
815
)
813
816
self .ping ()
814
817
self ._timestamp = 0
815
- return self ._wait_for_msg ()
818
+ return self ._wait_for_msg (timeout )
816
819
817
- def _wait_for_msg (self ):
820
+ def _wait_for_msg (self , timeout = 0.01 ):
818
821
"""Reads and processes network events."""
819
822
res = bytearray (1 ) #TODO: This should be a globally shared buffer for readinto
820
823
821
- self ._sock .setblocking ( False )
824
+ self ._sock .settimeout ( timeout )
822
825
try :
823
826
self ._sock .recv_into (res , 1 )
824
827
except BlockingIOError : # fix for macOS Errno
825
828
return None
829
+ except self ._socket_pool .timeout :
830
+ return None
831
+
826
832
self ._sock .setblocking (True )
827
833
if res in [None , b"" ]:
828
834
return None
0 commit comments