Skip to content

Commit 55d792e

Browse files
committed
Wait for passive connection closing
1 parent 0245143 commit 55d792e

File tree

1 file changed

+36
-12
lines changed

1 file changed

+36
-12
lines changed

src/ReactMqttClient.php

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class ReactMqttClient extends EventEmitter
7171
private $isConnecting = false;
7272
/** @var bool */
7373
private $isDisconnecting = false;
74+
/** @var callable */
75+
private $onCloseCallback;
76+
7477

7578
/** @var TimerInterface[] */
7679
private $timer = [];
@@ -227,9 +230,10 @@ public function connect($host, $port = 1883, Connection $connection = null, $tim
227230
/**
228231
* Disconnects from a broker.
229232
*
233+
* @param int $timeout
230234
* @return ExtendedPromiseInterface
231235
*/
232-
public function disconnect()
236+
public function disconnect($timeout = 5)
233237
{
234238
if (!$this->isConnected || $this->isDisconnecting) {
235239
return new RejectedPromise(new \LogicException('The client is not connected.'));
@@ -239,21 +243,36 @@ public function disconnect()
239243

240244
$deferred = new Deferred();
241245

242-
$this->startFlow(new OutgoingDisconnectFlow($this->connection), true)
243-
->then(function (Connection $connection) use ($deferred) {
244-
$this->isDisconnecting = false;
245-
$this->isConnected = false;
246+
$isResolved = false;
246247

247-
$this->emit('disconnect', [$connection, $this]);
248-
$deferred->resolve($connection);
248+
$this->onCloseCallback = function ($connection) use ($deferred, &$isResolved) {
249+
if (!$isResolved) {
250+
$isResolved = true;
249251

250-
if ($this->stream !== null) {
251-
$this->stream->close();
252+
if ($connection) {
253+
$this->emit('disconnect', [$connection, $this]);
254+
$deferred->resolve($connection);
252255
}
256+
}
257+
};
258+
259+
$this->startFlow(new OutgoingDisconnectFlow($this->connection), true)
260+
->then(function () use ($timeout) {
261+
$this->timer[] = $this->loop->addTimer(
262+
$timeout,
263+
function () {
264+
if ($this->stream !== null) {
265+
$this->stream->close();
266+
}
267+
}
268+
);
253269
})
254-
->otherwise(function () use ($deferred) {
255-
$this->isDisconnecting = false;
256-
$deferred->reject($this->connection);
270+
->otherwise(function () use ($deferred, &$isResolved) {
271+
if (!$isResolved) {
272+
$isResolved = true;
273+
$this->isDisconnecting = false;
274+
$deferred->reject($this->connection);
275+
}
257276
});
258277

259278
return $deferred->promise();
@@ -569,6 +588,11 @@ private function handleClose()
569588
$this->connection = null;
570589
$this->stream = null;
571590

591+
if ($this->onCloseCallback !== null) {
592+
call_user_func($this->onCloseCallback, $connection);
593+
$this->onCloseCallback = null;
594+
}
595+
572596
if ($connection !== null) {
573597
$this->emit('close', [$connection, $this]);
574598
}

0 commit comments

Comments
 (0)