Skip to content

Commit caadd76

Browse files
committed
[sqs] introduce sqs client that hides differencies of single and multi aws clients.
2 parents c285f64 + 7f17f06 commit caadd76

13 files changed

+238
-6
lines changed

docs/transport/sqs.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ It uses internally official [aws sdk library](https://packagist.org/packages/aws
1919
* [Send delay message](#send-delay-message)
2020
* [Consume message](#consume-message)
2121
* [Purge queue messages](#purge-queue-messages)
22+
* [Queue from another AWS account](#queue-from-another-aws-account)
2223

2324
## Installation
2425

@@ -122,4 +123,23 @@ $fooQueue = $context->createQueue('foo');
122123
$context->purgeQueue($fooQueue);
123124
```
124125

126+
## Queue from another AWS account
127+
128+
SQS allows to use queues from another account. You could set it globally for all queues via option `queue_owner_aws_account_id` or
129+
per queue using `SqsDestination::setQueueOwnerAWSAccountId` method.
130+
131+
```php
132+
<?php
133+
use Enqueue\Sqs\SqsConnectionFactory;
134+
135+
// globally for all queues
136+
$factory = new SqsConnectionFactory('sqs:?queue_owner_aws_account_id=awsAccountId');
137+
138+
$context = (new SqsConnectionFactory('sqs:'))->createContext();
139+
140+
// per queue.
141+
$queue = $context->createQueue('foo');
142+
$queue->setQueueOwnerAWSAccountId('awsAccountId');
143+
```
144+
125145
[back to index](../index.md)

pkg/sqs/SqsClient.php

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Sqs;
6+
7+
use Aws\MultiRegionClient;
8+
use Aws\Result;
9+
use Aws\Sqs\SqsClient as AwsSqsClient;
10+
11+
class SqsClient
12+
{
13+
/**
14+
* @var AwsSqsClient
15+
*/
16+
private $singleClient;
17+
18+
/**
19+
* @var MultiRegionClient
20+
*/
21+
private $multiClient;
22+
23+
/**
24+
* @var callable
25+
*/
26+
private $inputClient;
27+
28+
public function __construct($inputClient)
29+
{
30+
$this->inputClient = is_callable($inputClient) ? $inputClient : function () use ($inputClient) {
31+
return $inputClient;
32+
};
33+
}
34+
35+
public function deleteMessage(array $args): Result
36+
{
37+
return $this->callApi('deleteMessage', $args);
38+
}
39+
40+
public function receiveMessage(array $args): Result
41+
{
42+
return $this->callApi('receiveMessage', $args);
43+
}
44+
45+
public function purgeQueue(array $args): Result
46+
{
47+
return $this->callApi('purgeQueue', $args);
48+
}
49+
50+
public function getQueueUrl(array $args): Result
51+
{
52+
return $this->callApi('getQueueUrl', $args);
53+
}
54+
55+
public function createQueue(array $args): Result
56+
{
57+
return $this->callApi('createQueue', $args);
58+
}
59+
60+
public function deleteQueue(array $args): Result
61+
{
62+
return $this->callApi('deleteQueue', $args);
63+
}
64+
65+
public function sendMessage(array $args): Result
66+
{
67+
return $this->callApi('sendMessage', $args);
68+
}
69+
70+
public function getAWSClient(): AwsSqsClient
71+
{
72+
$this->resolveClient();
73+
74+
if ($this->singleClient) {
75+
return $this->singleClient;
76+
}
77+
78+
if ($this->multiClient) {
79+
$mr = new \ReflectionMethod($this->multiClient, 'getClientFromPool');
80+
$mr->setAccessible(true);
81+
$singleClient = $mr->invoke($this->multiClient, $this->multiClient->getRegion());
82+
$mr->setAccessible(false);
83+
84+
return $singleClient;
85+
}
86+
87+
throw new \LogicException('The multi or single client must be set');
88+
}
89+
90+
private function callApi(string $name, array $args): Result
91+
{
92+
if ($this->singleClient) {
93+
unset($args['@region']);
94+
95+
return call_user_func([$this->singleClient, $name], $args);
96+
}
97+
98+
if ($this->multiClient) {
99+
return call_user_func([$this->multiClient, $name], $args);
100+
}
101+
102+
throw new \LogicException('The multi or single client must be set');
103+
}
104+
105+
private function resolveClient(): void
106+
{
107+
if ($this->singleClient || $this->multiClient) {
108+
return;
109+
}
110+
111+
$client = call_user_func($this->inputClient);
112+
if ($client instanceof MultiRegionClient) {
113+
$this->multiClient = $client;
114+
115+
return;
116+
}
117+
if ($client instanceof AwsSqsClient) {
118+
$this->singleClient = $client;
119+
120+
return;
121+
}
122+
123+
throw new \LogicException(sprintf(
124+
'The input client must be an instance of "%s" or "%s" or a callable that returns one of those. Got "%s"',
125+
AwsSqsClient::class,
126+
MultiRegionClient::class,
127+
is_object($client) ? get_class($client) : gettype($client)
128+
));
129+
}
130+
}

pkg/sqs/SqsConnectionFactory.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace Enqueue\Sqs;
66

7+
use Aws\MultiRegionClient;
8+
use Aws\Sdk;
79
use Aws\Sqs\SqsClient;
810
use Enqueue\Dsn\Dsn;
911
use Interop\Queue\ConnectionFactory;
@@ -17,7 +19,7 @@ class SqsConnectionFactory implements ConnectionFactory
1719
private $config;
1820

1921
/**
20-
* @var SqsClient
22+
* @var MultiRegionClient
2123
*/
2224
private $client;
2325

@@ -81,7 +83,7 @@ public function createContext(): Context
8183
return new SqsContext($this->establishConnection(), $this->config);
8284
}
8385

84-
private function establishConnection(): SqsClient
86+
private function establishConnection(): MultiRegionClient
8587
{
8688
if ($this->client) {
8789
return $this->client;
@@ -108,7 +110,7 @@ private function establishConnection(): SqsClient
108110
}
109111
}
110112

111-
$this->client = new SqsClient($config);
113+
$this->client = (new Sdk(['Sqs' => $config]))->createMultiRegionSqs();
112114

113115
return $this->client;
114116
}

pkg/sqs/SqsConsumer.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public function acknowledge(Message $message): void
120120
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);
121121

122122
$this->context->getClient()->deleteMessage([
123+
'@region' => $this->queue->getRegion(),
123124
'QueueUrl' => $this->context->getQueueUrl($this->queue),
124125
'ReceiptHandle' => $message->getReceiptHandle(),
125126
]);
@@ -133,6 +134,7 @@ public function reject(Message $message, bool $requeue = false): void
133134
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);
134135

135136
$this->context->getClient()->deleteMessage([
137+
'@region' => $this->queue->getRegion(),
136138
'QueueUrl' => $this->context->getQueueUrl($this->queue),
137139
'ReceiptHandle' => $message->getReceiptHandle(),
138140
]);
@@ -149,6 +151,7 @@ protected function receiveMessage(int $timeoutSeconds): ?SqsMessage
149151
}
150152

151153
$arguments = [
154+
'@region' => $this->queue->getRegion(),
152155
'AttributeNames' => ['All'],
153156
'MessageAttributeNames' => ['All'],
154157
'MaxNumberOfMessages' => $this->maxNumberOfMessages,

pkg/sqs/SqsContext.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public function purgeQueue(Queue $queue): void
119119
InvalidDestinationException::assertDestinationInstanceOf($queue, SqsDestination::class);
120120

121121
$this->getClient()->purgeQueue([
122+
'@region' => $queue->getRegion(),
122123
'QueueUrl' => $this->getQueueUrl($queue),
123124
]);
124125
}
@@ -152,8 +153,14 @@ public function getQueueUrl(SqsDestination $destination): string
152153
return $this->queueUrls[$destination->getQueueName()];
153154
}
154155

155-
$arguments = ['QueueName' => $destination->getQueueName()];
156-
if (false == empty($this->config['queue_owner_aws_account_id'])) {
156+
$arguments = [
157+
'@region' => $destination->getRegion(),
158+
'QueueName' => $destination->getQueueName()
159+
];
160+
161+
if ($destination->getQueueOwnerAWSAccountId()) {
162+
$arguments['QueueOwnerAWSAccountId'] = $destination->getQueueOwnerAWSAccountId();
163+
} elseif (false == empty($this->config['queue_owner_aws_account_id'])) {
157164
$arguments['QueueOwnerAWSAccountId'] = $this->config['queue_owner_aws_account_id'];
158165
}
159166

@@ -169,6 +176,7 @@ public function getQueueUrl(SqsDestination $destination): string
169176
public function declareQueue(SqsDestination $dest): void
170177
{
171178
$result = $this->getClient()->createQueue([
179+
'@region' => $dest->getRegion(),
172180
'Attributes' => $dest->getAttributes(),
173181
'QueueName' => $dest->getQueueName(),
174182
]);

pkg/sqs/SqsDestination.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,21 @@ class SqsDestination implements Topic, Queue
1414
*/
1515
private $name;
1616

17+
/**
18+
* @var string|null
19+
*/
20+
private $region;
21+
1722
/**
1823
* @var array
1924
*/
2025
private $attributes;
2126

27+
/**
28+
* @var string|null
29+
*/
30+
private $queueOwnerAWSAccountId;
31+
2232
/**
2333
* The name of the new queue.
2434
* The following limits apply to this name:
@@ -187,4 +197,24 @@ public function setContentBasedDeduplication(bool $enable): void
187197
unset($this->attributes['ContentBasedDeduplication']);
188198
}
189199
}
200+
201+
public function getQueueOwnerAWSAccountId(): ?string
202+
{
203+
return $this->queueOwnerAWSAccountId;
204+
}
205+
206+
public function setQueueOwnerAWSAccountId(?string $queueOwnerAWSAccountId): void
207+
{
208+
$this->queueOwnerAWSAccountId = $queueOwnerAWSAccountId;
209+
}
210+
211+
public function setRegion(string $region = null): void
212+
{
213+
$this->region = $region;
214+
}
215+
216+
public function getRegion(): ?string
217+
{
218+
return $this->region;
219+
}
190220
}

pkg/sqs/SqsProducer.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public function send(Destination $destination, Message $message): void
4444
}
4545

4646
$arguments = [
47+
'@region' => $destination->getRegion(),
4748
'MessageAttributes' => [
4849
'Headers' => [
4950
'DataType' => 'String',

pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44

55
use Enqueue\Sqs\SqsContext;
66
use Enqueue\Sqs\SqsDestination;
7+
use Enqueue\Test\RetryTrait;
78
use Enqueue\Test\SqsExtension;
89
use Interop\Queue\Context;
910
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;
1011

1112
/**
1213
* @group functional
14+
* @retry 5
1315
*/
1416
class SqsSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
1517
{
18+
use RetryTrait;
1619
use SqsExtension;
1720
use CreateSqsQueueTrait;
1821

pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44

55
use Enqueue\Sqs\SqsContext;
66
use Enqueue\Sqs\SqsDestination;
7+
use Enqueue\Test\RetryTrait;
78
use Enqueue\Test\SqsExtension;
89
use Interop\Queue\Context;
910
use Interop\Queue\Spec\SendToAndReceiveNoWaitFromQueueSpec;
1011

1112
/**
1213
* @group functional
14+
* @retry 5
1315
*/
1416
class SqsSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitFromQueueSpec
1517
{
18+
use RetryTrait;
1619
use SqsExtension;
1720
use CreateSqsQueueTrait;
1821

pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44

55
use Enqueue\Sqs\SqsContext;
66
use Enqueue\Sqs\SqsDestination;
7+
use Enqueue\Test\RetryTrait;
78
use Enqueue\Test\SqsExtension;
89
use Interop\Queue\Context;
910
use Interop\Queue\Spec\SendToAndReceiveNoWaitFromTopicSpec;
1011

1112
/**
1213
* @group functional
14+
* @retry 5
1315
*/
1416
class SqsSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitFromTopicSpec
1517
{
18+
use RetryTrait;
1619
use SqsExtension;
1720
use CreateSqsQueueTrait;
1821

0 commit comments

Comments
 (0)