@@ -72,6 +72,8 @@ class ReactMqttClient extends EventEmitter
72
72
private $ publishQos2 = [];
73
73
/** @var PublishRequestPacket[] */
74
74
private $ incomingQos2 = [];
75
+ /** @var Packet[] */
76
+ private $ packetQueue = [];
75
77
76
78
/**
77
79
* Constructs an instance of this class.
@@ -171,8 +173,7 @@ public function disconnect()
171
173
172
174
$ packet = new DisconnectRequestPacket ();
173
175
174
- $ this ->stream ->write ($ packet );
175
- $ this ->stream ->close ();
176
+ $ this ->stream ->end ($ packet );
176
177
$ this ->stream = null ;
177
178
178
179
$ this ->isConnected = false ;
@@ -192,7 +193,7 @@ public function subscribe($topic, $qosLevel = 0)
192
193
$ packet ->setTopic ($ topic );
193
194
$ packet ->setQosLevel ($ qosLevel );
194
195
195
- $ this ->stream -> write ($ packet );
196
+ $ this ->writePacket ($ packet );
196
197
197
198
$ id = $ packet ->getIdentifier ();
198
199
$ this ->deferred ['subscribe ' ][$ id ] = new Deferred ();
@@ -213,7 +214,7 @@ public function unsubscribe($topic)
213
214
$ packet = new UnsubscribeRequestPacket ();
214
215
$ packet ->setTopic ($ topic );
215
216
216
- $ this ->stream -> write ($ packet );
217
+ $ this ->writePacket ($ packet );
217
218
218
219
$ id = $ packet ->getIdentifier ();
219
220
$ this ->deferred ['unsubscribe ' ][$ id ] = new Deferred ();
@@ -241,7 +242,7 @@ public function publish($topic, $message, $qosLevel = 0, $retain = false)
241
242
$ packet ->setRetained ($ retain );
242
243
$ packet ->setDuplicate (false );
243
244
244
- $ this ->stream -> write ($ packet );
245
+ $ this ->writePacket ($ packet );
245
246
246
247
if ($ qosLevel == 0 ) {
247
248
return new FulfilledPromise ($ message );
@@ -305,6 +306,13 @@ private function connectStream(Stream $stream, array $settings, $timeout, $keepA
305
306
$ this ->handleData ($ data );
306
307
});
307
308
309
+ $ this ->stream ->getBuffer ()->on ('full-drain ' , function () {
310
+ if (count ($ this ->packetQueue ) > 0 ) {
311
+ $ packet = array_shift ($ this ->packetQueue );
312
+ $ this ->stream ->write ($ packet );
313
+ }
314
+ });
315
+
308
316
$ this ->stream ->on ('error ' , function (\Exception $ e ) {
309
317
$ this ->emitError ($ e );
310
318
});
@@ -321,7 +329,7 @@ private function connectStream(Stream $stream, array $settings, $timeout, $keepA
321
329
$ this ->timer [] = $ this ->loop ->addPeriodicTimer (
322
330
floor ($ keepAlive * 0.75 ),
323
331
function () {
324
- $ this ->stream -> write (new PingRequestPacket ());
332
+ $ this ->writePacket (new PingRequestPacket ());
325
333
}
326
334
);
327
335
@@ -349,7 +357,7 @@ function () use ($timeout) {
349
357
$ packet ->setWill ($ will ['topic ' ], $ will ['message ' ], $ will ['qos ' ], $ will ['retain ' ]);
350
358
}
351
359
352
- $ this ->stream -> write ($ packet );
360
+ $ this ->writePacket ($ packet );
353
361
}
354
362
355
363
/**
@@ -460,6 +468,20 @@ private function sanitizeOptions(array $options)
460
468
return array_merge ($ defaults , $ options );
461
469
}
462
470
471
+ /**
472
+ * Writes a packet to the stream or buffers it if the stream is busy.
473
+ *
474
+ * @param Packet $packet
475
+ */
476
+ private function writePacket (Packet $ packet )
477
+ {
478
+ if ($ this ->stream ->getBuffer ()->listening ) {
479
+ $ this ->packetQueue [] = $ packet ;
480
+ } else {
481
+ $ this ->stream ->write ($ packet );
482
+ }
483
+ }
484
+
463
485
/**
464
486
* Handles a CONNACK packet.
465
487
*
@@ -563,7 +585,7 @@ private function handlePublishRequest(PublishRequestPacket $packet)
563
585
564
586
if ($ response !== null ) {
565
587
$ response ->setIdentifier ($ packet ->getIdentifier ());
566
- $ this ->stream -> write ($ response );
588
+ $ this ->writePacket ($ response );
567
589
}
568
590
569
591
if ($ emit ) {
@@ -606,7 +628,7 @@ private function handlePublishReceived(PublishReceivedPacket $packet)
606
628
607
629
$ response = new PublishReleasePacket ();
608
630
$ response ->setIdentifier ($ id );
609
- $ this ->stream -> write ($ response );
631
+ $ this ->writePacket ($ response );
610
632
}
611
633
612
634
/**
@@ -620,7 +642,7 @@ private function handlePublishRelease(PublishReleasePacket $packet)
620
642
621
643
$ response = new PublishCompletePacket ();
622
644
$ response ->setIdentifier ($ id );
623
- $ this ->stream -> write ($ response );
645
+ $ this ->writePacket ($ response );
624
646
625
647
if (!isset ($ this ->incomingQos2 [$ id ])) {
626
648
$ this ->emitWarning (new \LogicException (sprintf ('PUBREL: Packet identifier %d not found. ' , $ id )));
0 commit comments