-
Notifications
You must be signed in to change notification settings - Fork 432
feat(batch): add support to SQS FIFO queues (SqsFifoPartialProcessor) #1934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rubenfonseca
merged 6 commits into
aws-powertools:develop
from
rubenfonseca:rf/batch-sqs-fifo
Feb 20, 2023
Merged
Changes from 3 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
7686381
feat(batch): support SQS FIFO queues
rubenfonseca aad65f0
fix: addressed review comments
rubenfonseca a1367e7
fix: typo
rubenfonseca d7be0a4
fix: address review
rubenfonseca 0070643
Update tests/functional/test_utilities_batch.py
rubenfonseca 1f8e621
docs: fix typo
heitorlessa File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
from typing import List, Optional, Tuple | ||
|
||
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType | ||
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel | ||
|
||
|
||
class SQSFifoCircuitBreakerError(Exception): | ||
""" | ||
Signals a record not processed due to the SQS FIFO processing being interrupted | ||
""" | ||
|
||
pass | ||
|
||
|
||
class SQSFifoPartialProcessor(BatchProcessor): | ||
"""Process native partial responses from SQS FIFO queues. | ||
|
||
Stops processing records when the first record fails. The remaining records are reported as failed items. | ||
|
||
Example | ||
_______ | ||
|
||
## Process batch triggered by a FIFO SQS | ||
|
||
```python | ||
import json | ||
|
||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor, EventType, batch_processor | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
processor = SQSFifoPartialProcessor() | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method | ||
def record_handler(record: SQSRecord): | ||
payload: str = record.body | ||
if payload: | ||
item: dict = json.loads(payload) | ||
... | ||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
@batch_processor(record_handler=record_handler, processor=processor) | ||
def lambda_handler(event, context: LambdaContext): | ||
return processor.response() | ||
``` | ||
""" | ||
|
||
circuit_breaker_exc = ( | ||
SQSFifoCircuitBreakerError, | ||
SQSFifoCircuitBreakerError("A previous record failed processing"), | ||
None, | ||
) | ||
|
||
def __init__(self, model: Optional["BatchSqsTypeModel"] = None): | ||
super().__init__(EventType.SQS, model) | ||
|
||
def process(self) -> List[Tuple]: | ||
""" | ||
Call instance's handler for each record. When the first failed message is detected, | ||
the process is short-circuited, and the remaining messages are reported as failed items. | ||
""" | ||
result: List[Tuple] = [] | ||
|
||
for i, record in enumerate(self.records): | ||
# If we have failed messages, it means that the last message failed. | ||
# We then short circuit the process, failing the remaining messages | ||
if self.fail_messages: | ||
return self._short_circuit_processing(i, result) | ||
|
||
# Otherwise, process the message normally | ||
result.append(self._process_record(record)) | ||
|
||
return result | ||
|
||
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]: | ||
rubenfonseca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Starting from the first failure index, fail all the remaining messages, and append them to the result list. | ||
""" | ||
remaining_records = self.records[first_failure_index:] | ||
for remaining_record in remaining_records: | ||
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model) | ||
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) | ||
return result | ||
|
||
async def _async_process_record(self, record: dict): | ||
raise NotImplementedError() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# | ||
# type specifics | ||
# | ||
import sys | ||
from typing import Optional, Type, Union | ||
|
||
has_pydantic = "pydantic" in sys.modules | ||
|
||
# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses | ||
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation | ||
if has_pydantic: | ||
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel | ||
from aws_lambda_powertools.utilities.parser.models import ( | ||
KinesisDataStreamRecord as KinesisDataStreamRecordModel, | ||
) | ||
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel | ||
|
||
BatchTypeModels = Optional[ | ||
Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]] | ||
] | ||
BatchSqsTypeModel = Optional[Type[SqsRecordModel]] | ||
else: | ||
BatchTypeModels = Optional # type: ignore | ||
rubenfonseca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
BatchSqsTypeModel = Optional # type: ignore |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import ( | ||
SQSFifoPartialProcessor, | ||
batch_processor, | ||
) | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
processor = SQSFifoPartialProcessor() | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method | ||
def record_handler(record: SQSRecord): | ||
... | ||
|
||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
@batch_processor(record_handler=record_handler, processor=processor) | ||
def lambda_handler(event, context: LambdaContext): | ||
return processor.response() |
23 changes: 23 additions & 0 deletions
23
examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from aws_lambda_powertools import Logger, Tracer | ||
from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor | ||
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
processor = SQSFifoPartialProcessor() | ||
tracer = Tracer() | ||
logger = Logger() | ||
|
||
|
||
@tracer.capture_method | ||
def record_handler(record: SQSRecord): | ||
... | ||
|
||
|
||
@logger.inject_lambda_context | ||
@tracer.capture_lambda_handler | ||
def lambda_handler(event, context: LambdaContext): | ||
batch = event["Records"] | ||
with processor(records=batch, handler=record_handler): | ||
processor.process() # kick off processing, return List[Tuple] | ||
|
||
return processor.response() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import json | ||
import uuid | ||
from random import randint | ||
from typing import Any, Awaitable, Callable, Dict, Optional | ||
|
||
|
@@ -9,6 +10,7 @@ | |
AsyncBatchProcessor, | ||
BatchProcessor, | ||
EventType, | ||
SQSFifoPartialProcessor, | ||
async_batch_processor, | ||
batch_processor, | ||
) | ||
|
@@ -40,7 +42,7 @@ | |
def sqs_event_factory() -> Callable: | ||
def factory(body: str): | ||
return { | ||
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", | ||
"messageId": str(uuid.uuid4()), | ||
rubenfonseca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", | ||
"body": body, | ||
"attributes": { | ||
|
@@ -117,6 +119,17 @@ def handler(record): | |
return handler | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def sqs_fifo_record_handler() -> Callable: | ||
def handler(record): | ||
body = record["body"] | ||
if "fail" in body: | ||
raise Exception("Failed to process record.") | ||
return body | ||
|
||
return handler | ||
rubenfonseca marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@pytest.fixture(scope="module") | ||
def async_record_handler() -> Callable[..., Awaitable[Any]]: | ||
async def handler(record): | ||
|
@@ -654,6 +667,48 @@ def lambda_handler(event, context): | |
assert "All records failed processing. " in str(e.value) | ||
|
||
|
||
def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, sqs_fifo_record_handler): | ||
# GIVEN | ||
first_record = SQSRecord(sqs_event_factory("success")) | ||
second_record = SQSRecord(sqs_event_factory("success")) | ||
event = {"Records": [first_record.raw_event, second_record.raw_event]} | ||
|
||
processor = SQSFifoPartialProcessor() | ||
|
||
@batch_processor(record_handler=sqs_fifo_record_handler, processor=processor) | ||
def lambda_handler(event, context): | ||
return processor.response() | ||
|
||
# WHEN | ||
result = lambda_handler(event, {}) | ||
|
||
# THEN | ||
assert result["batchItemFailures"] == [] | ||
|
||
|
||
def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, sqs_fifo_record_handler): | ||
# GIVEN | ||
first_record = SQSRecord(sqs_event_factory("success")) | ||
second_record = SQSRecord(sqs_event_factory("fail")) | ||
# this would normally suceed, but since it's a FIFO queue, it will be marked as failure | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. loved this comment. thank you for your great attention to detail @rubenfonseca |
||
third_record = SQSRecord(sqs_event_factory("success")) | ||
event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]} | ||
|
||
processor = SQSFifoPartialProcessor() | ||
|
||
@batch_processor(record_handler=sqs_fifo_record_handler, processor=processor) | ||
def lambda_handler(event, context): | ||
return processor.response() | ||
|
||
# WHEN | ||
result = lambda_handler(event, {}) | ||
|
||
# THEN | ||
assert len(result["batchItemFailures"]) == 2 | ||
assert result["batchItemFailures"][0]["itemIdentifier"] == second_record.message_id | ||
assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id | ||
|
||
|
||
def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler): | ||
# GIVEN | ||
first_record = SQSRecord(sqs_event_factory("success")) | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.