@@ -784,22 +784,35 @@ def subscribe(self, topic, qos=0):
784
784
stamp = time .monotonic ()
785
785
while True :
786
786
op = self ._wait_for_msg ()
787
- if op == 0x90 :
788
- rc = self ._sock_exact_recv (4 )
789
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
790
- if rc [3 ] == 0x80 :
791
- raise MMQTTException ("SUBACK Failure!" )
792
- for t , q in topics :
793
- if self .on_subscribe is not None :
794
- self .on_subscribe (self , self ._user_data , t , q )
795
- self ._subscribed_topics .append (t )
796
- return
797
-
798
787
if op is None :
799
788
if time .monotonic () - stamp > self ._recv_timeout :
800
789
raise MMQTTException (
801
790
f"No data received from broker for { self ._recv_timeout } seconds."
802
791
)
792
+ else :
793
+ if op == 0x90 :
794
+ rc = self ._sock_exact_recv (3 )
795
+ # Check packet identifier.
796
+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
797
+ remaining_len = rc [0 ] - 2
798
+ assert remaining_len > 0
799
+ rc = self ._sock_exact_recv (remaining_len )
800
+ for i in range (0 , remaining_len ):
801
+ if rc [i ] not in [0 , 1 , 2 ]:
802
+ raise MMQTTException (
803
+ f"SUBACK Failure for topic "
804
+ f"{ topics [i ][0 ]} : { hex (rc [i ])} "
805
+ )
806
+
807
+ for t , q in topics :
808
+ if self .on_subscribe is not None :
809
+ self .on_subscribe (self , self ._user_data , t , q )
810
+ self ._subscribed_topics .append (t )
811
+ return
812
+
813
+ raise MMQTTException (
814
+ f"invalid message received as response to SUBSCRIBE: { hex (op )} "
815
+ )
803
816
804
817
def unsubscribe (self , topic ):
805
818
"""Unsubscribes from a MQTT topic.
@@ -838,22 +851,26 @@ def unsubscribe(self, topic):
838
851
while True :
839
852
stamp = time .monotonic ()
840
853
op = self ._wait_for_msg ()
841
- if op == 176 :
842
- rc = self ._sock_exact_recv (3 )
843
- assert rc [0 ] == 0x02
844
- # [MQTT-3.32]
845
- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
846
- for t in topics :
847
- if self .on_unsubscribe is not None :
848
- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
849
- self ._subscribed_topics .remove (t )
850
- return
851
-
852
854
if op is None :
853
855
if time .monotonic () - stamp > self ._recv_timeout :
854
856
raise MMQTTException (
855
857
f"No data received from broker for { self ._recv_timeout } seconds."
856
858
)
859
+ else :
860
+ if op == 176 :
861
+ rc = self ._sock_exact_recv (3 )
862
+ assert rc [0 ] == 0x02
863
+ # [MQTT-3.32]
864
+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
865
+ for t in topics :
866
+ if self .on_unsubscribe is not None :
867
+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
868
+ self ._subscribed_topics .remove (t )
869
+ return
870
+
871
+ raise MMQTTException (
872
+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
873
+ )
857
874
858
875
def _recompute_reconnect_backoff (self ):
859
876
"""
@@ -992,6 +1009,7 @@ def _wait_for_msg(self, timeout=0.1):
992
1009
return MQTT_PINGRESP
993
1010
994
1011
if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
1012
+ self .logger .debug (f"Got message type: { hex (res [0 ])} " )
995
1013
return res [0 ]
996
1014
997
1015
# Handle only the PUBLISH packet type from now on.
0 commit comments