Skip to content

Commit 23b0c1c

Browse files
authored
Merge pull request #1214 from kate-simozhenko/sns-fifo
Add FIFO logic to SNS
2 parents bb4a036 + e03f429 commit 23b0c1c

File tree

4 files changed

+87
-51
lines changed

4 files changed

+87
-51
lines changed

.github/workflows/ci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ jobs:
2929

3030
- run: sed -i 's/525568/16777471/' vendor/kwn/php-rdkafka-stubs/stubs/constants.php
3131

32+
- run: cd docker && docker build --rm --force-rm --no-cache --pull --tag "enqueue/dev:latest" -f Dockerfile .
3233
- run: docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon --error-format=github -- ${{ env.GIT_DIFF_FILTERED }}
3334
if: env.GIT_DIFF_FILTERED
3435

pkg/sns/SnsDestination.php

+28
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,34 @@ public function getDeliveryPolicy(): ?int
7070
return $this->getAttribute('DeliveryPolicy');
7171
}
7272

73+
/**
74+
* Only FIFO.
75+
*
76+
* Designates a topic as FIFO. You can provide this attribute only during queue creation.
77+
* You can't change it for an existing topic. When you set this attribute, you must provide aMessageGroupId
78+
* explicitly.
79+
* For more information, see https://docs.aws.amazon.com/sns/latest/dg/sns-fifo-topics.html
80+
*/
81+
public function setFifoTopic(bool $enable): void
82+
{
83+
$value = $enable ? 'true' : null;
84+
85+
$this->setAttribute('FifoTopic', $value);
86+
}
87+
88+
/**
89+
* Only FIFO.
90+
*
91+
* Enables content-based deduplication.
92+
* For more information, see: https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html
93+
*/
94+
public function setContentBasedDeduplication(bool $enable): void
95+
{
96+
$value = $enable ? 'true' : null;
97+
98+
$this->setAttribute('ContentBasedDeduplication', $value);
99+
}
100+
73101
public function getAttributes(): array
74102
{
75103
return $this->attributes;

pkg/sns/SnsMessage.php

+50-51
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,22 @@ class SnsMessage implements Message
4242
*/
4343
private $targetArn;
4444

45+
/**
46+
* @var string|null
47+
*/
48+
private $messageGroupId;
49+
50+
/**
51+
* @var string|null
52+
*/
53+
private $messageDeduplicationId;
54+
4555
/**
4656
* SnsMessage constructor.
4757
*
4858
* See AWS documentation for message attribute structure.
4959
*
5060
* @see https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sns-2010-03-31.html#shape-messageattributevalue
51-
*
52-
* @param string $body
53-
* @param array $properties
54-
* @param array $headers
55-
* @param array|null $messageAttributes
56-
* @param string|null $messageStructure
57-
* @param string|null $phoneNumber
58-
* @param string|null $subject
59-
* @param string|null $targetArn
6061
*/
6162
public function __construct(
6263
string $body = '',
@@ -79,89 +80,58 @@ public function __construct(
7980
$this->redelivered = false;
8081
}
8182

82-
/**
83-
* @return string|null
84-
*/
8583
public function getSnsMessageId(): ?string
8684
{
8785
return $this->snsMessageId;
8886
}
8987

90-
/**
91-
* @param string|null $snsMessageId
92-
*/
9388
public function setSnsMessageId(?string $snsMessageId): void
9489
{
9590
$this->snsMessageId = $snsMessageId;
9691
}
9792

98-
/**
99-
* @return string|null
100-
*/
10193
public function getMessageStructure(): ?string
10294
{
10395
return $this->messageStructure;
10496
}
10597

106-
/**
107-
* @param string|null $messageStructure
108-
*/
10998
public function setMessageStructure(?string $messageStructure): void
11099
{
111100
$this->messageStructure = $messageStructure;
112101
}
113102

114-
/**
115-
* @return string|null
116-
*/
117103
public function getPhoneNumber(): ?string
118104
{
119105
return $this->phoneNumber;
120106
}
121107

122-
/**
123-
* @param string|null $phoneNumber
124-
*/
125108
public function setPhoneNumber(?string $phoneNumber): void
126109
{
127110
$this->phoneNumber = $phoneNumber;
128111
}
129112

130-
/**
131-
* @return string|null
132-
*/
133113
public function getSubject(): ?string
134114
{
135115
return $this->subject;
136116
}
137117

138-
/**
139-
* @param string|null $subject
140-
*/
141118
public function setSubject(?string $subject): void
142119
{
143120
$this->subject = $subject;
144121
}
145122

146-
/**
147-
* @return array|null
148-
*/
149123
public function getMessageAttributes(): ?array
150124
{
151125
return $this->messageAttributes;
152126
}
153127

154-
/**
155-
* @param array|null $messageAttributes
156-
*/
157128
public function setMessageAttributes(?array $messageAttributes): void
158129
{
159130
$this->messageAttributes = $messageAttributes;
160131
}
161132

162133
/**
163-
* @param string $name
164-
* @param null $default
134+
* @param null $default
165135
*
166136
* @return array|null
167137
*/
@@ -177,9 +147,6 @@ public function getAttribute(string $name, $default = null)
177147
* 'DataType' => '<string>', // REQUIRED
178148
* 'StringValue' => '<string>',
179149
* ].
180-
*
181-
* @param string $name
182-
* @param array|null $attribute
183150
*/
184151
public function setAttribute(string $name, ?array $attribute): void
185152
{
@@ -191,7 +158,6 @@ public function setAttribute(string $name, ?array $attribute): void
191158
}
192159

193160
/**
194-
* @param string $name
195161
* @param string $dataType String, String.Array, Number, or Binary
196162
* @param string|resource|StreamInterface $value
197163
*/
@@ -205,19 +171,52 @@ public function addAttribute(string $name, string $dataType, $value): void
205171
];
206172
}
207173

208-
/**
209-
* @return string|null
210-
*/
211174
public function getTargetArn(): ?string
212175
{
213176
return $this->targetArn;
214177
}
215178

216-
/**
217-
* @param string|null $targetArn
218-
*/
219179
public function setTargetArn(?string $targetArn): void
220180
{
221181
$this->targetArn = $targetArn;
222182
}
183+
184+
/**
185+
* Only FIFO.
186+
*
187+
* The tag that specifies that a message belongs to a specific message group. Messages that belong to the same
188+
* message group are processed in a FIFO manner (however, messages in different message groups might be processed
189+
* out of order).
190+
* To interleave multiple ordered streams within a single queue, use MessageGroupId values (for example, session
191+
* data for multiple users). In this scenario, multiple readers can process the queue, but the session data
192+
* of each user is processed in a FIFO fashion.
193+
* For more information, see: https://docs.aws.amazon.com/sns/latest/dg/fifo-message-grouping.html
194+
*/
195+
public function setMessageGroupId(string $id = null): void
196+
{
197+
$this->messageGroupId = $id;
198+
}
199+
200+
public function getMessageGroupId(): ?string
201+
{
202+
return $this->messageGroupId;
203+
}
204+
205+
/**
206+
* Only FIFO.
207+
*
208+
* The token used for deduplication of sent messages. If a message with a particular MessageDeduplicationId is
209+
* sent successfully, any messages sent with the same MessageDeduplicationId are accepted successfully but
210+
* aren't delivered during the 5-minute deduplication interval.
211+
* For more information, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html
212+
*/
213+
public function setMessageDeduplicationId(string $id = null): void
214+
{
215+
$this->messageDeduplicationId = $id;
216+
}
217+
218+
public function getMessageDeduplicationId(): ?string
219+
{
220+
return $this->messageDeduplicationId;
221+
}
223222
}

pkg/sns/SnsProducer.php

+8
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ public function send(Destination $destination, Message $message): void
7777
$arguments['TargetArn'] = $targetArn;
7878
}
7979

80+
if ($messageGroupId = $message->getMessageGroupId()) {
81+
$arguments['MessageGroupId'] = $messageGroupId;
82+
}
83+
84+
if ($messageDeduplicationId = $message->getMessageDeduplicationId()) {
85+
$arguments['MessageDeduplicationId'] = $messageDeduplicationId;
86+
}
87+
8088
$result = $this->context->getSnsClient()->publish($arguments);
8189

8290
if (false == $result->hasKey('MessageId')) {

0 commit comments

Comments
 (0)