@@ -790,22 +790,34 @@ def subscribe(self, topic, qos=0):
790
790
stamp = time .monotonic ()
791
791
while True :
792
792
op = self ._wait_for_msg ()
793
- if op == 0x90 :
794
- rc = self ._sock_exact_recv (4 )
795
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
796
- if rc [3 ] == 0x80 :
797
- raise MMQTTException ("SUBACK Failure!" )
798
- for t , q in topics :
799
- if self .on_subscribe is not None :
800
- self .on_subscribe (self , self ._user_data , t , q )
801
- self ._subscribed_topics .append (t )
802
- return
803
-
804
793
if op is None :
805
794
if time .monotonic () - stamp > self ._recv_timeout :
806
795
raise MMQTTException (
807
796
f"No data received from broker for { self ._recv_timeout } seconds."
808
797
)
798
+ else :
799
+ if op == 0x90 :
800
+ rc = self ._sock_exact_recv (3 )
801
+ # Check packet identifier.
802
+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
803
+ remaining_len = rc [0 ] - 2
804
+ assert remaining_len > 0
805
+ rc = self ._sock_exact_recv (remaining_len )
806
+ for i in range (0 , remaining_len ):
807
+ if rc [i ] not in [0 , 1 , 2 ]:
808
+ raise MMQTTException (
809
+ f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
810
+ )
811
+
812
+ for t , q in topics :
813
+ if self .on_subscribe is not None :
814
+ self .on_subscribe (self , self ._user_data , t , q )
815
+ self ._subscribed_topics .append (t )
816
+ return
817
+
818
+ raise MMQTTException (
819
+ f"invalid message received as response to SUBSCRIBE: { hex (op )} "
820
+ )
809
821
810
822
def unsubscribe (self , topic ):
811
823
"""Unsubscribes from a MQTT topic.
@@ -844,22 +856,26 @@ def unsubscribe(self, topic):
844
856
while True :
845
857
stamp = time .monotonic ()
846
858
op = self ._wait_for_msg ()
847
- if op == 176 :
848
- rc = self ._sock_exact_recv (3 )
849
- assert rc [0 ] == 0x02
850
- # [MQTT-3.32]
851
- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
852
- for t in topics :
853
- if self .on_unsubscribe is not None :
854
- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
855
- self ._subscribed_topics .remove (t )
856
- return
857
-
858
859
if op is None :
859
860
if time .monotonic () - stamp > self ._recv_timeout :
860
861
raise MMQTTException (
861
862
f"No data received from broker for { self ._recv_timeout } seconds."
862
863
)
864
+ else :
865
+ if op == 176 :
866
+ rc = self ._sock_exact_recv (3 )
867
+ assert rc [0 ] == 0x02
868
+ # [MQTT-3.32]
869
+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
870
+ for t in topics :
871
+ if self .on_unsubscribe is not None :
872
+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
873
+ self ._subscribed_topics .remove (t )
874
+ return
875
+
876
+ raise MMQTTException (
877
+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
878
+ )
863
879
864
880
def _recompute_reconnect_backoff (self ):
865
881
"""
@@ -998,6 +1014,7 @@ def _wait_for_msg(self, timeout=0.1):
998
1014
return MQTT_PINGRESP
999
1015
1000
1016
if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
1017
+ self .logger .debug (f"Got message type: { hex (res [0 ])} " )
1001
1018
return res [0 ]
1002
1019
1003
1020
# Handle only the PUBLISH packet type from now on.
0 commit comments