Skip to content

Commit dd32103

Browse files
committed
[consumption] Add preConsume extension point, remove beforeReceive.
1 parent 490c62b commit dd32103

18 files changed

+523
-198
lines changed

Diff for: pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,8 @@ public function testShouldConfigureQueueConsumer()
436436
]], $container);
437437

438438
$def = $container->getDefinition('enqueue.transport.default.queue_consumer');
439-
$this->assertSame('%enqueue.transport.default.idle_time%', $def->getArgument(2));
440-
$this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(3));
439+
$this->assertSame('%enqueue.transport.default.idle_time%', $def->getArgument(4));
440+
$this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(5));
441441

442442
$this->assertSame(123, $container->getParameter('enqueue.transport.default.idle_time'));
443443
$this->assertSame(456, $container->getParameter('enqueue.transport.default.receive_timeout'));

Diff for: pkg/enqueue/Consumption/ChainExtension.php

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\PreConsume;
56
use Enqueue\Consumption\Context\PreSubscribe;
67
use Enqueue\Consumption\Context\Start;
78

@@ -32,17 +33,17 @@ public function onStart(Start $context): void
3233
}
3334
}
3435

35-
public function preSubscribe(PreSubscribe $context): void
36+
public function onPreSubscribe(PreSubscribe $context): void
3637
{
3738
foreach ($this->extensions as $extension) {
38-
$extension->preSubscribe($context);
39+
$extension->onPreSubscribe($context);
3940
}
4041
}
4142

42-
public function onBeforeReceive(Context $context)
43+
public function onPreConsume(PreConsume $context): void
4344
{
4445
foreach ($this->extensions as $extension) {
45-
$extension->onBeforeReceive($context);
46+
$extension->onPreConsume($context);
4647
}
4748
}
4849

Diff for: pkg/enqueue/Consumption/Context/PreConsume.php

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Interop\Queue\Context;
6+
use Interop\Queue\SubscriptionConsumer;
7+
use Psr\Log\LoggerInterface;
8+
9+
final class PreConsume
10+
{
11+
/**
12+
* @var Context
13+
*/
14+
private $context;
15+
16+
/**
17+
* @var SubscriptionConsumer
18+
*/
19+
private $subscriptionConsumer;
20+
21+
/**
22+
* @var LoggerInterface
23+
*/
24+
private $logger;
25+
26+
/**
27+
* @var int
28+
*/
29+
private $cycle;
30+
31+
/**
32+
* @var int
33+
*/
34+
private $receiveTimeout;
35+
36+
/**
37+
* @var int
38+
*/
39+
private $startTime;
40+
41+
/**
42+
* @var bool
43+
*/
44+
private $executionInterrupted;
45+
46+
public function __construct(Context $context, SubscriptionConsumer $subscriptionConsumer, LoggerInterface $logger, int $cycle, int $receiveTimeout, int $startTime)
47+
{
48+
$this->context = $context;
49+
$this->subscriptionConsumer = $subscriptionConsumer;
50+
$this->logger = $logger;
51+
$this->cycle = $cycle;
52+
$this->receiveTimeout = $receiveTimeout;
53+
$this->startTime = $startTime;
54+
55+
$this->executionInterrupted = false;
56+
}
57+
58+
public function getContext(): Context
59+
{
60+
return $this->context;
61+
}
62+
63+
public function getSubscriptionConsumer(): SubscriptionConsumer
64+
{
65+
return $this->subscriptionConsumer;
66+
}
67+
68+
public function getLogger(): LoggerInterface
69+
{
70+
return $this->logger;
71+
}
72+
73+
public function getCycle(): int
74+
{
75+
return $this->cycle;
76+
}
77+
78+
public function getReceiveTimeout(): int
79+
{
80+
return $this->receiveTimeout;
81+
}
82+
83+
public function getStartTime(): int
84+
{
85+
return $this->startTime;
86+
}
87+
88+
public function isExecutionInterrupted(): bool
89+
{
90+
return $this->executionInterrupted;
91+
}
92+
93+
public function interruptExecution(): void
94+
{
95+
$this->executionInterrupted = true;
96+
}
97+
}

Diff for: pkg/enqueue/Consumption/Context/PreSubscribe.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use Interop\Queue\Processor;
88
use Psr\Log\LoggerInterface;
99

10-
class PreSubscribe
10+
final class PreSubscribe
1111
{
1212
/**
1313
* @var Context

Diff for: pkg/enqueue/Consumption/EmptyExtensionTrait.php

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\PreConsume;
56
use Enqueue\Consumption\Context\PreSubscribe;
67
use Enqueue\Consumption\Context\Start;
78

@@ -11,11 +12,11 @@ public function onStart(Start $context): void
1112
{
1213
}
1314

14-
public function preSubscribe(PreSubscribe $preSubscribe): void
15+
public function onPreSubscribe(PreSubscribe $preSubscribe): void
1516
{
1617
}
1718

18-
public function onBeforeReceive(Context $context)
19+
public function onPreConsume(PreConsume $context): void
1920
{
2021
}
2122

Diff for: pkg/enqueue/Consumption/Extension/LimitConsumedMessagesExtension.php

+15-23
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
namespace Enqueue\Consumption\Extension;
44

55
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\PreConsume;
67
use Enqueue\Consumption\EmptyExtensionTrait;
78
use Enqueue\Consumption\ExtensionInterface;
9+
use Psr\Log\LoggerInterface;
810

911
class LimitConsumedMessagesExtension implements ExtensionInterface
1012
{
@@ -23,51 +25,41 @@ class LimitConsumedMessagesExtension implements ExtensionInterface
2325
/**
2426
* @param int $messageLimit
2527
*/
26-
public function __construct($messageLimit)
28+
public function __construct(int $messageLimit)
2729
{
28-
if (false == is_int($messageLimit)) {
29-
throw new \InvalidArgumentException(sprintf(
30-
'Expected message limit is int but got: "%s"',
31-
is_object($messageLimit) ? get_class($messageLimit) : gettype($messageLimit)
32-
));
33-
}
34-
3530
$this->messageLimit = $messageLimit;
3631
$this->messageConsumed = 0;
3732
}
3833

39-
/**
40-
* {@inheritdoc}
41-
*/
42-
public function onBeforeReceive(Context $context)
34+
public function onPreConsume(PreConsume $context): void
4335
{
4436
// this is added here to handle an edge case. when a user sets zero as limit.
45-
$this->checkMessageLimit($context);
37+
if ($this->shouldBeStopped($context->getLogger())) {
38+
$context->interruptExecution();
39+
}
4640
}
4741

48-
/**
49-
* {@inheritdoc}
50-
*/
5142
public function onPostReceived(Context $context)
5243
{
5344
++$this->messageConsumed;
5445

55-
$this->checkMessageLimit($context);
46+
if ($this->shouldBeStopped($context->getLogger())) {
47+
$context->setExecutionInterrupted(true);
48+
}
5649
}
5750

58-
/**
59-
* @param Context $context
60-
*/
61-
protected function checkMessageLimit(Context $context)
51+
protected function shouldBeStopped(LoggerInterface $logger): bool
6252
{
6353
if ($this->messageConsumed >= $this->messageLimit) {
64-
$context->getLogger()->debug(sprintf(
54+
$logger->debug(sprintf(
6555
'[LimitConsumedMessagesExtension] Message consumption is interrupted since the message limit reached.'.
6656
' limit: "%s"',
6757
$this->messageLimit
6858
));
6959

70-
$context->setExecutionInterrupted(true);
60+
return true;
7161
}
62+
63+
return false;
7264
}
7365
}

Diff for: pkg/enqueue/Consumption/Extension/LimitConsumerMemoryExtension.php

+17-19
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
namespace Enqueue\Consumption\Extension;
44

55
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\PreConsume;
67
use Enqueue\Consumption\EmptyExtensionTrait;
78
use Enqueue\Consumption\ExtensionInterface;
9+
use Psr\Log\LoggerInterface;
810

911
class LimitConsumerMemoryExtension implements ExtensionInterface
1012
{
@@ -30,44 +32,40 @@ public function __construct($memoryLimit)
3032
$this->memoryLimit = $memoryLimit * 1024 * 1024;
3133
}
3234

33-
/**
34-
* {@inheritdoc}
35-
*/
36-
public function onBeforeReceive(Context $context)
35+
public function onPreConsume(PreConsume $context): void
3736
{
38-
$this->checkMemory($context);
37+
if ($this->shouldBeStopped($context->getLogger())) {
38+
$context->interruptExecution();
39+
}
3940
}
4041

41-
/**
42-
* {@inheritdoc}
43-
*/
4442
public function onPostReceived(Context $context)
4543
{
46-
$this->checkMemory($context);
44+
if ($this->shouldBeStopped($context->getLogger())) {
45+
$context->setExecutionInterrupted(true);
46+
}
4747
}
4848

49-
/**
50-
* {@inheritdoc}
51-
*/
5249
public function onIdle(Context $context)
5350
{
54-
$this->checkMemory($context);
51+
if ($this->shouldBeStopped($context->getLogger())) {
52+
$context->setExecutionInterrupted(true);
53+
}
5554
}
5655

57-
/**
58-
* @param Context $context
59-
*/
60-
protected function checkMemory(Context $context)
56+
protected function shouldBeStopped(LoggerInterface $logger): bool
6157
{
6258
$memoryUsage = memory_get_usage(true);
6359
if ($memoryUsage >= $this->memoryLimit) {
64-
$context->getLogger()->debug(sprintf(
60+
$logger->debug(sprintf(
6561
'[LimitConsumerMemoryExtension] Interrupt execution as memory limit reached. limit: "%s", used: "%s"',
6662
$this->memoryLimit,
6763
$memoryUsage
6864
));
6965

70-
$context->setExecutionInterrupted(true);
66+
return true;
7167
}
68+
69+
return false;
7270
}
7371
}

Diff for: pkg/enqueue/Consumption/Extension/LimitConsumptionTimeExtension.php

+17-19
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
namespace Enqueue\Consumption\Extension;
44

55
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\PreConsume;
67
use Enqueue\Consumption\EmptyExtensionTrait;
78
use Enqueue\Consumption\ExtensionInterface;
9+
use Psr\Log\LoggerInterface;
810

911
class LimitConsumptionTimeExtension implements ExtensionInterface
1012
{
@@ -23,45 +25,41 @@ public function __construct(\DateTime $timeLimit)
2325
$this->timeLimit = $timeLimit;
2426
}
2527

26-
/**
27-
* {@inheritdoc}
28-
*/
29-
public function onBeforeReceive(Context $context)
28+
public function onPreConsume(PreConsume $context): void
3029
{
31-
$this->checkTime($context);
30+
if ($this->shouldBeStopped($context->getLogger())) {
31+
$context->interruptExecution();
32+
}
3233
}
3334

34-
/**
35-
* {@inheritdoc}
36-
*/
3735
public function onIdle(Context $context)
3836
{
39-
$this->checkTime($context);
37+
if ($this->shouldBeStopped($context->getLogger())) {
38+
$context->setExecutionInterrupted(true);
39+
}
4040
}
4141

42-
/**
43-
* {@inheritdoc}
44-
*/
4542
public function onPostReceived(Context $context)
4643
{
47-
$this->checkTime($context);
44+
if ($this->shouldBeStopped($context->getLogger())) {
45+
$context->setExecutionInterrupted(true);
46+
}
4847
}
4948

50-
/**
51-
* @param Context $context
52-
*/
53-
protected function checkTime(Context $context)
49+
protected function shouldBeStopped(LoggerInterface $logger): bool
5450
{
5551
$now = new \DateTime();
5652
if ($now >= $this->timeLimit) {
57-
$context->getLogger()->debug(sprintf(
53+
$logger->debug(sprintf(
5854
'[LimitConsumptionTimeExtension] Execution interrupted as limit time has passed.'.
5955
' now: "%s", time-limit: "%s"',
6056
$now->format(DATE_ISO8601),
6157
$this->timeLimit->format(DATE_ISO8601)
6258
));
6359

64-
$context->setExecutionInterrupted(true);
60+
return true;
6561
}
62+
63+
return false;
6664
}
6765
}

0 commit comments

Comments
 (0)