Skip to content

Commit ba418ad

Browse files
committed
wamp
1 parent 3364d61 commit ba418ad

6 files changed

+25
-28
lines changed

pkg/wamp/Tests/Functional/WampConsumerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public function testShouldSendAndReceiveMessage()
3535
$consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) {
3636
$producer->send($topic, $message);
3737
});
38-
38+
3939
$receivedMessage = $consumer->receive(100);
4040

4141
$this->assertInstanceOf(WampMessage::class, $receivedMessage);

pkg/wamp/Tests/Functional/WampSubscriptionConsumerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public function testShouldSendAndReceiveMessage()
4242
$consumer->getClient()->getLoop()->futureTick(function () use ($producer, $topic, $message) {
4343
$producer->send($topic, $message);
4444
});
45-
45+
4646
$consumer->consume(100);
4747

4848
$this->assertInstanceOf(WampMessage::class, $receivedMessage);

pkg/wamp/Tests/Spec/JsonSerializerTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
namespace Enqueue\Wamp\Tests\Spec;
44

5+
use Enqueue\Test\ClassExtensionTrait;
56
use Enqueue\Wamp\JsonSerializer;
67
use Enqueue\Wamp\Serializer;
7-
use Enqueue\Test\ClassExtensionTrait;
88
use Enqueue\Wamp\WampMessage;
99
use PHPUnit\Framework\TestCase;
1010

@@ -40,7 +40,7 @@ public function testThrowIfFailedToEncodeMessageToJson()
4040
{
4141
$serializer = new JsonSerializer();
4242

43-
$resource = fopen(__FILE__, 'r');
43+
$resource = fopen(__FILE__, 'rb');
4444

4545
//guard
4646
$this->assertInternalType('resource', $resource);

pkg/wamp/WampConsumer.php

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ public function receive(int $timeout = 0): ?Message
6767
$this->client = $this->context->getNewClient();
6868
$this->client->setAttemptRetry(true);
6969
$this->client->on('open', function (ClientSession $session) {
70-
7170
$session->subscribe($this->queue->getQueueName(), function ($args) {
7271
$this->message = $this->context->getSerializer()->toMessage($args[0]);
7372

pkg/wamp/WampProducer.php

+20-21
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public function __construct(WampContext $context)
5151
* {@inheritdoc}
5252
*
5353
* @param WampDestination $destination
54-
* @param WampMessage $message
54+
* @param WampMessage $message
5555
*/
5656
public function send(Destination $destination, Message $message): void
5757
{
@@ -86,7 +86,6 @@ public function send(Destination $destination, Message $message): void
8686
});
8787

8888
$this->client->on('do-send', function (WampDestination $destination, WampMessage $message) {
89-
9089
$onFinish = function () {
9190
$this->client->emit('do-stop');
9291
};
@@ -113,25 +112,6 @@ public function send(Destination $destination, Message $message): void
113112
$this->client->getLoop()->run();
114113
}
115114

116-
private function doSendMessageIfPossible()
117-
{
118-
if (null === $this->session) {
119-
return;
120-
}
121-
122-
if (null === $this->message) {
123-
return;
124-
}
125-
126-
$message = $this->message;
127-
$destination = $this->destination;
128-
129-
$this->message = null;
130-
$this->destination = null;
131-
132-
$this->client->emit('do-send', [$destination, $message]);
133-
}
134-
135115
/**
136116
* {@inheritdoc}
137117
*
@@ -188,4 +168,23 @@ public function getTimeToLive(): ?int
188168
{
189169
return null;
190170
}
171+
172+
private function doSendMessageIfPossible()
173+
{
174+
if (null === $this->session) {
175+
return;
176+
}
177+
178+
if (null === $this->message) {
179+
return;
180+
}
181+
182+
$message = $this->message;
183+
$destination = $this->destination;
184+
185+
$this->message = null;
186+
$this->destination = null;
187+
188+
$this->client->emit('do-send', [$destination, $message]);
189+
}
191190
}

pkg/wamp/WampSubscriptionConsumer.php

+1-2
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,13 @@ public function consume(int $timeout = 0): void
5959
$this->client = $this->context->getNewClient();
6060
$this->client->setAttemptRetry(true);
6161
$this->client->on('open', function (ClientSession $session) {
62-
6362
foreach ($this->subscribers as $queue => $subscriber) {
6463
$session->subscribe($queue, function ($args) use ($subscriber) {
6564
$message = $this->context->getSerializer()->toMessage($args[0]);
6665

6766
/**
6867
* @var WampConsumer $consumer
69-
* @var callable $callback
68+
* @var callable $callback
7069
*/
7170
list($consumer, $callback) = $subscriber;
7271

0 commit comments

Comments
 (0)