19
19
use BinSoul \Net \Mqtt \StreamParser ;
20
20
use BinSoul \Net \Mqtt \Subscription ;
21
21
use Evenement \EventEmitter ;
22
+ use Exception ;
23
+ use LogicException ;
22
24
use React \EventLoop \LoopInterface ;
23
25
use React \EventLoop \TimerInterface ;
24
26
use React \Promise \CancellablePromiseInterface ;
27
29
use React \Promise \RejectedPromise ;
28
30
use React \Socket \ConnectorInterface ;
29
31
use React \Stream \DuplexStreamInterface ;
32
+ use RuntimeException ;
30
33
31
34
/**
32
35
* Connects to a MQTT broker and subscribes to topics or publishes messages.
@@ -99,9 +102,9 @@ class ReactMqttClient extends EventEmitter
99
102
public function __construct (
100
103
ConnectorInterface $ connector ,
101
104
LoopInterface $ loop ,
102
- ClientIdentifierGenerator $ identifierGenerator = null ,
103
- FlowFactory $ flowFactory = null ,
104
- StreamParser $ parser = null
105
+ ? ClientIdentifierGenerator $ identifierGenerator = null ,
106
+ ? FlowFactory $ flowFactory = null ,
107
+ ? StreamParser $ parser = null
105
108
) {
106
109
$ this ->connector = $ connector ;
107
110
$ this ->loop = $ loop ;
@@ -111,7 +114,7 @@ public function __construct(
111
114
$ this ->parser = new StreamParser (new DefaultPacketFactory ());
112
115
}
113
116
114
- $ this ->parser ->onError (function (\ Exception $ e ) {
117
+ $ this ->parser ->onError (function (Exception $ e ) {
115
118
$ this ->emitWarning ($ e );
116
119
});
117
120
@@ -161,7 +164,7 @@ public function isConnected(): bool
161
164
*
162
165
* @return DuplexStreamInterface|null
163
166
*/
164
- public function getStream ()
167
+ public function getStream (): ? DuplexStreamInterface
165
168
{
166
169
return $ this ->stream ;
167
170
}
@@ -179,7 +182,7 @@ public function getStream()
179
182
public function connect (string $ host , int $ port = 1883 , Connection $ connection = null , int $ timeout = 5 ): ExtendedPromiseInterface
180
183
{
181
184
if ($ this ->isConnected || $ this ->isConnecting ) {
182
- return new RejectedPromise (new \ LogicException ('The client is already connected. ' ));
185
+ return new RejectedPromise (new LogicException ('The client is already connected. ' ));
183
186
}
184
187
185
188
$ this ->isConnecting = true ;
@@ -213,7 +216,7 @@ public function connect(string $host, int $port = 1883, Connection $connection =
213
216
$ this ->emit ('connect ' , [$ connection , $ this ]);
214
217
$ deferred ->resolve ($ result ?: $ connection );
215
218
})
216
- ->otherwise (function (\ Exception $ e ) use ($ connection , $ deferred ) {
219
+ ->otherwise (function (Exception $ e ) use ($ connection , $ deferred ) {
217
220
$ this ->isConnecting = false ;
218
221
219
222
$ this ->emitError ($ e );
@@ -226,7 +229,7 @@ public function connect(string $host, int $port = 1883, Connection $connection =
226
229
$ this ->emit ('close ' , [$ connection , $ this ]);
227
230
});
228
231
})
229
- ->otherwise (function (\ Exception $ e ) use ($ deferred ) {
232
+ ->otherwise (function (Exception $ e ) use ($ deferred ) {
230
233
$ this ->isConnecting = false ;
231
234
232
235
$ this ->emitError ($ e );
@@ -246,7 +249,7 @@ public function connect(string $host, int $port = 1883, Connection $connection =
246
249
public function disconnect (int $ timeout = 5 ): ExtendedPromiseInterface
247
250
{
248
251
if (!$ this ->isConnected || $ this ->isDisconnecting ) {
249
- return new RejectedPromise (new \ LogicException ('The client is not connected. ' ));
252
+ return new RejectedPromise (new LogicException ('The client is not connected. ' ));
250
253
}
251
254
252
255
$ this ->isDisconnecting = true ;
@@ -303,7 +306,7 @@ function () {
303
306
public function subscribe (Subscription $ subscription ): ExtendedPromiseInterface
304
307
{
305
308
if (!$ this ->isConnected ) {
306
- return new RejectedPromise (new \ LogicException ('The client is not connected. ' ));
309
+ return new RejectedPromise (new LogicException ('The client is not connected. ' ));
307
310
}
308
311
309
312
return $ this ->startFlow ($ this ->flowFactory ->buildOutgoingSubscribeFlow ([$ subscription ]));
@@ -319,7 +322,7 @@ public function subscribe(Subscription $subscription): ExtendedPromiseInterface
319
322
public function unsubscribe (Subscription $ subscription ): ExtendedPromiseInterface
320
323
{
321
324
if (!$ this ->isConnected ) {
322
- return new RejectedPromise (new \ LogicException ('The client is not connected. ' ));
325
+ return new RejectedPromise (new LogicException ('The client is not connected. ' ));
323
326
}
324
327
325
328
return $ this ->startFlow ($ this ->flowFactory ->buildOutgoingUnsubscribeFlow ([$ subscription ]));
@@ -335,7 +338,7 @@ public function unsubscribe(Subscription $subscription): ExtendedPromiseInterfac
335
338
public function publish (Message $ message ): ExtendedPromiseInterface
336
339
{
337
340
if (!$ this ->isConnected ) {
338
- return new RejectedPromise (new \ LogicException ('The client is not connected. ' ));
341
+ return new RejectedPromise (new LogicException ('The client is not connected. ' ));
339
342
}
340
343
341
344
return $ this ->startFlow ($ this ->flowFactory ->buildOutgoingPublishFlow ($ message ));
@@ -353,7 +356,7 @@ public function publish(Message $message): ExtendedPromiseInterface
353
356
public function publishPeriodically (int $ interval , Message $ message , callable $ generator ): ExtendedPromiseInterface
354
357
{
355
358
if (!$ this ->isConnected ) {
356
- return new RejectedPromise (new \ LogicException ('The client is not connected. ' ));
359
+ return new RejectedPromise (new LogicException ('The client is not connected. ' ));
357
360
}
358
361
359
362
$ deferred = new Deferred ();
@@ -365,7 +368,7 @@ function () use ($message, $generator, $deferred) {
365
368
static function ($ value ) use ($ deferred ) {
366
369
$ deferred ->notify ($ value );
367
370
},
368
- static function (\ Exception $ e ) use ($ deferred ) {
371
+ static function (Exception $ e ) use ($ deferred ) {
369
372
$ deferred ->reject ($ e );
370
373
}
371
374
);
@@ -378,23 +381,23 @@ static function (\Exception $e) use ($deferred) {
378
381
/**
379
382
* Emits warnings.
380
383
*
381
- * @param \ Exception $e
384
+ * @param Exception $e
382
385
*
383
386
* @return void
384
387
*/
385
- private function emitWarning (\ Exception $ e )
388
+ private function emitWarning (Exception $ e ): void
386
389
{
387
390
$ this ->emit ('warning ' , [$ e , $ this ]);
388
391
}
389
392
390
393
/**
391
394
* Emits errors.
392
395
*
393
- * @param \ Exception $e
396
+ * @param Exception $e
394
397
*
395
398
* @return void
396
399
*/
397
- private function emitError (\ Exception $ e )
400
+ private function emitError (Exception $ e ): void
398
401
{
399
402
$ this ->emit ('error ' , [$ e , $ this ]);
400
403
}
@@ -416,7 +419,7 @@ private function establishConnection(string $host, int $port, int $timeout): Ext
416
419
$ timer = $ this ->loop ->addTimer (
417
420
$ timeout ,
418
421
static function () use ($ deferred , $ timeout , &$ future ) {
419
- $ exception = new \ RuntimeException (sprintf ('Connection timed out after %d seconds. ' , $ timeout ));
422
+ $ exception = new RuntimeException (sprintf ('Connection timed out after %d seconds. ' , $ timeout ));
420
423
$ deferred ->reject ($ exception );
421
424
if ($ future instanceof CancellablePromiseInterface) {
422
425
$ future ->cancel ();
@@ -438,13 +441,13 @@ static function () use ($deferred, $timeout, &$future) {
438
441
$ this ->handleClose ();
439
442
});
440
443
441
- $ stream ->on ('error ' , function (\ Exception $ e ) {
444
+ $ stream ->on ('error ' , function (Exception $ e ) {
442
445
$ this ->handleError ($ e );
443
446
});
444
447
445
448
$ deferred ->resolve ($ stream );
446
449
})
447
- ->otherwise (static function (\ Exception $ e ) use ($ deferred ) {
450
+ ->otherwise (static function (Exception $ e ) use ($ deferred ) {
448
451
$ deferred ->reject ($ e );
449
452
});
450
453
@@ -466,7 +469,7 @@ private function registerClient(Connection $connection, int $timeout): ExtendedP
466
469
$ responseTimer = $ this ->loop ->addTimer (
467
470
$ timeout ,
468
471
static function () use ($ deferred , $ timeout ) {
469
- $ exception = new \ RuntimeException (sprintf ('No response after %d seconds. ' , $ timeout ));
472
+ $ exception = new RuntimeException (sprintf ('No response after %d seconds. ' , $ timeout ));
470
473
$ deferred ->reject ($ exception );
471
474
}
472
475
);
@@ -483,7 +486,7 @@ function () {
483
486
);
484
487
485
488
$ deferred ->resolve ($ result ?: $ connection );
486
- })->otherwise (static function (\ Exception $ e ) use ($ deferred ) {
489
+ })->otherwise (static function (Exception $ e ) use ($ deferred ) {
487
490
$ deferred ->reject ($ e );
488
491
});
489
492
@@ -497,7 +500,7 @@ function () {
497
500
*
498
501
* @return void
499
502
*/
500
- private function handleReceive (string $ data )
503
+ private function handleReceive (string $ data ): void
501
504
{
502
505
if (!$ this ->isConnected && !$ this ->isConnecting ) {
503
506
return ;
@@ -524,12 +527,12 @@ private function handleReceive(string $data)
524
527
*
525
528
* @return void
526
529
*/
527
- private function handlePacket (Packet $ packet )
530
+ private function handlePacket (Packet $ packet ): void
528
531
{
529
532
switch ($ packet ->getPacketType ()) {
530
533
case Packet::TYPE_PUBLISH :
531
534
if (!($ packet instanceof PublishRequestPacket)) {
532
- throw new \ RuntimeException (sprintf ('Expected %s but got %s. ' , PublishRequestPacket::class, get_class ($ packet )));
535
+ throw new RuntimeException (sprintf ('Expected %s but got %s. ' , PublishRequestPacket::class, get_class ($ packet )));
533
536
}
534
537
535
538
$ message = new DefaultMessage (
@@ -564,13 +567,13 @@ private function handlePacket(Packet $packet)
564
567
565
568
if (!$ flowFound ) {
566
569
$ this ->emitWarning (
567
- new \ LogicException (sprintf ('Received unexpected packet of type %d. ' , $ packet ->getPacketType ()))
570
+ new LogicException (sprintf ('Received unexpected packet of type %d. ' , $ packet ->getPacketType ()))
568
571
);
569
572
}
570
573
break ;
571
574
default :
572
575
$ this ->emitWarning (
573
- new \ LogicException (sprintf ('Cannot handle packet of type %d. ' , $ packet ->getPacketType ()))
576
+ new LogicException (sprintf ('Cannot handle packet of type %d. ' , $ packet ->getPacketType ()))
574
577
);
575
578
}
576
579
}
@@ -580,7 +583,7 @@ private function handlePacket(Packet $packet)
580
583
*
581
584
* @return void
582
585
*/
583
- private function handleSend ()
586
+ private function handleSend (): void
584
587
{
585
588
$ flow = null ;
586
589
if ($ this ->writtenFlow !== null ) {
@@ -609,7 +612,7 @@ private function handleSend()
609
612
*
610
613
* @return void
611
614
*/
612
- private function handleClose ()
615
+ private function handleClose (): void
613
616
{
614
617
foreach ($ this ->timer as $ timer ) {
615
618
$ this ->loop ->cancelTimer ($ timer );
@@ -636,11 +639,11 @@ private function handleClose()
636
639
/**
637
640
* Handles errors of the stream.
638
641
*
639
- * @param \ Exception $e
642
+ * @param Exception $e
640
643
*
641
644
* @return void
642
645
*/
643
- private function handleError (\ Exception $ e )
646
+ private function handleError (Exception $ e ): void
644
647
{
645
648
$ this ->emitError ($ e );
646
649
}
@@ -657,7 +660,7 @@ private function startFlow(Flow $flow, bool $isSilent = false): ExtendedPromiseI
657
660
{
658
661
try {
659
662
$ packet = $ flow ->start ();
660
- } catch (\ Exception $ e ) {
663
+ } catch (Exception $ e ) {
661
664
$ this ->emitError ($ e );
662
665
663
666
return new RejectedPromise ($ e );
@@ -691,11 +694,11 @@ private function startFlow(Flow $flow, bool $isSilent = false): ExtendedPromiseI
691
694
*
692
695
* @return void
693
696
*/
694
- private function continueFlow (ReactFlow $ flow , Packet $ packet )
697
+ private function continueFlow (ReactFlow $ flow , Packet $ packet ): void
695
698
{
696
699
try {
697
700
$ response = $ flow ->next ($ packet );
698
- } catch (\ Exception $ e ) {
701
+ } catch (Exception $ e ) {
699
702
$ this ->emitError ($ e );
700
703
701
704
return ;
@@ -723,7 +726,7 @@ private function continueFlow(ReactFlow $flow, Packet $packet)
723
726
*
724
727
* @return void
725
728
*/
726
- private function finishFlow (ReactFlow $ flow )
729
+ private function finishFlow (ReactFlow $ flow ): void
727
730
{
728
731
if ($ flow ->isSuccess ()) {
729
732
if (!$ flow ->isSilent ()) {
@@ -732,7 +735,7 @@ private function finishFlow(ReactFlow $flow)
732
735
733
736
$ flow ->getDeferred ()->resolve ($ flow ->getResult ());
734
737
} else {
735
- $ result = new \ RuntimeException ($ flow ->getErrorMessage ());
738
+ $ result = new RuntimeException ($ flow ->getErrorMessage ());
736
739
$ this ->emitWarning ($ result );
737
740
738
741
$ flow ->getDeferred ()->reject ($ result );
0 commit comments