@@ -784,22 +784,34 @@ def subscribe(self, topic, qos=0):
784
784
stamp = time .monotonic ()
785
785
while True :
786
786
op = self ._wait_for_msg ()
787
- if op == 0x90 :
788
- rc = self ._sock_exact_recv (4 )
789
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
790
- if rc [3 ] == 0x80 :
791
- raise MMQTTException ("SUBACK Failure!" )
792
- for t , q in topics :
793
- if self .on_subscribe is not None :
794
- self .on_subscribe (self , self ._user_data , t , q )
795
- self ._subscribed_topics .append (t )
796
- return
797
-
798
787
if op is None :
799
788
if time .monotonic () - stamp > self ._recv_timeout :
800
789
raise MMQTTException (
801
790
f"No data received from broker for { self ._recv_timeout } seconds."
802
791
)
792
+ else :
793
+ if op == 0x90 :
794
+ rc = self ._sock_exact_recv (3 )
795
+ # Check packet identifier.
796
+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
797
+ remaining_len = rc [0 ] - 2
798
+ assert remaining_len > 0
799
+ rc = self ._sock_exact_recv (remaining_len )
800
+ for i in range (0 , remaining_len ):
801
+ if rc [i ] not in [0 , 1 , 2 ]:
802
+ raise MMQTTException (
803
+ f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
804
+ )
805
+
806
+ for t , q in topics :
807
+ if self .on_subscribe is not None :
808
+ self .on_subscribe (self , self ._user_data , t , q )
809
+ self ._subscribed_topics .append (t )
810
+ return
811
+
812
+ raise MMQTTException (
813
+ f"invalid message received as response to SUBSCRIBE: { hex (op )} "
814
+ )
803
815
804
816
def unsubscribe (self , topic ):
805
817
"""Unsubscribes from a MQTT topic.
@@ -838,22 +850,26 @@ def unsubscribe(self, topic):
838
850
while True :
839
851
stamp = time .monotonic ()
840
852
op = self ._wait_for_msg ()
841
- if op == 176 :
842
- rc = self ._sock_exact_recv (3 )
843
- assert rc [0 ] == 0x02
844
- # [MQTT-3.32]
845
- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
846
- for t in topics :
847
- if self .on_unsubscribe is not None :
848
- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
849
- self ._subscribed_topics .remove (t )
850
- return
851
-
852
853
if op is None :
853
854
if time .monotonic () - stamp > self ._recv_timeout :
854
855
raise MMQTTException (
855
856
f"No data received from broker for { self ._recv_timeout } seconds."
856
857
)
858
+ else :
859
+ if op == 176 :
860
+ rc = self ._sock_exact_recv (3 )
861
+ assert rc [0 ] == 0x02
862
+ # [MQTT-3.32]
863
+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
864
+ for t in topics :
865
+ if self .on_unsubscribe is not None :
866
+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
867
+ self ._subscribed_topics .remove (t )
868
+ return
869
+
870
+ raise MMQTTException (
871
+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
872
+ )
857
873
858
874
def _recompute_reconnect_backoff (self ):
859
875
"""
@@ -992,6 +1008,7 @@ def _wait_for_msg(self, timeout=0.1):
992
1008
return MQTT_PINGRESP
993
1009
994
1010
if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
1011
+ self .logger .debug (f"Got message type: { hex (res [0 ])} " )
995
1012
return res [0 ]
996
1013
997
1014
# Handle only the PUBLISH packet type from now on.
0 commit comments