From 768638191f87943d541fd950f6099fead81231bd Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Thu, 16 Feb 2023 13:33:39 +0100 Subject: [PATCH 1/6] feat(batch): support SQS FIFO queues --- .../utilities/batch/__init__.py | 4 + .../batch/sqs_fifo_partial_processor.py | 101 ++++++++++++++++++ docs/utilities/batch.md | 18 ++++ .../src/sqs_fifo_batch_processor.py | 23 ++++ ...qs_fifo_batch_processor_context_manager.py | 23 ++++ tests/functional/test_utilities_batch.py | 57 +++++++++- 6 files changed, 225 insertions(+), 1 deletion(-) create mode 100644 aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py create mode 100644 examples/batch_processing/src/sqs_fifo_batch_processor.py create mode 100644 examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 02f3e786441..9ad4ca43d38 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -16,6 +16,9 @@ batch_processor, ) from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo +from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import ( + SQSFifoPartialProcessor, +) __all__ = ( "BatchProcessor", @@ -26,6 +29,7 @@ "EventType", "FailureResponse", "SuccessResponse", + "SQSFifoPartialProcessor", "batch_processor", "async_batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py new file mode 100644 index 00000000000..8e6f293da34 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -0,0 +1,101 @@ +import sys +from typing import List, Optional, Tuple, Type + +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType +from aws_lambda_powertools.utilities.parser.models import SqsRecordModel + +# +# type specifics +# +has_pydantic = "pydantic" in sys.modules + +# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses +# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation +if has_pydantic: + BatchTypeModels = Optional[Type[SqsRecordModel]] + + +class SQSFifoCircuitBreakerError(Exception): + """ + Signals a record not processed due to the SQS FIFO processing being interrupted + """ + + pass + + +class SQSFifoPartialProcessor(BatchProcessor): + """Specialized BatchProcessor subclass that handles FIFO SQS batch records. + + As soon as the processing of the first record fails, the remaining records + are marked as failed without processing, and returned as native partial responses. + + Example + _______ + + ## Process batch triggered by a FIFO SQS + + ```python + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = SQSFifoPartialProcessor() + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + """ + + circuitBreakerError = SQSFifoCircuitBreakerError("A previous record failed processing.") + + def __init__(self, model: Optional["BatchTypeModels"] = None): + super().__init__(EventType.SQS, model) + + def process(self) -> List[Tuple]: + result: List[Tuple] = [] + + for i, record in enumerate(self.records): + """ + If we have failed messages, it means that the last message failed. + We then short circuit the process, failing the remaining messages + """ + if self.fail_messages: + return self._short_circuit_processing(i, result) + + """ + Otherwise, process the message normally + """ + result.append(self._process_record(record)) + + return result + + def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]: + remaining_records = self.records[first_failure_index:] + for remaining_record in remaining_records: + data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model) + result.append( + self.failure_handler( + record=data, exception=(type(self.circuitBreakerError), self.circuitBreakerError, None) + ) + ) + return result + + async def _async_process_record(self, record: dict): + raise NotImplementedError() diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 4a53e053f44..3f1e255aaa2 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -347,6 +347,24 @@ Processing batches from SQS works in four stages: } ``` +#### FIFO queues + +If you're using this feature with a FIFO queue, you should use the `SQSFifoPartialProcessor` class instead. We will +stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. +This helps preserve the ordering of messages in your queue. + +=== "As a decorator" + + ```python hl_lines="5 11" + --8<-- "examples/batch_processing/src/sqs_fifo_batch_processor.py" + ``` + +=== "As a context manager" + + ```python hl_lines="4 8" + --8<-- "examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py" + ``` + ### Processing messages from Kinesis Processing batches from Kinesis works in four stages: diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor.py b/examples/batch_processing/src/sqs_fifo_batch_processor.py new file mode 100644 index 00000000000..d1d2c587568 --- /dev/null +++ b/examples/batch_processing/src/sqs_fifo_batch_processor.py @@ -0,0 +1,23 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + SQSFifoPartialProcessor, + batch_processor, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = SQSFifoPartialProcessor() +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py b/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py new file mode 100644 index 00000000000..80b45737cb9 --- /dev/null +++ b/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py @@ -0,0 +1,23 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = SQSFifoPartialProcessor() +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processor.process() # kick off processing, return List[Tuple] + + return processor.response() diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 6dcfc3d179d..1ea5339ffc3 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,4 +1,5 @@ import json +import uuid from random import randint from typing import Any, Awaitable, Callable, Dict, Optional @@ -9,6 +10,7 @@ AsyncBatchProcessor, BatchProcessor, EventType, + SQSFifoPartialProcessor, async_batch_processor, batch_processor, ) @@ -40,7 +42,7 @@ def sqs_event_factory() -> Callable: def factory(body: str): return { - "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "messageId": str(uuid.uuid4()), "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", "body": body, "attributes": { @@ -117,6 +119,17 @@ def handler(record): return handler +@pytest.fixture(scope="module") +def sqs_fifo_record_handler() -> Callable: + def handler(record): + body = record["body"] + if "fail" in body: + raise Exception("Failed to process record.") + return body + + return handler + + @pytest.fixture(scope="module") def async_record_handler() -> Callable[..., Awaitable[Any]]: async def handler(record): @@ -654,6 +667,48 @@ def lambda_handler(event, context): assert "All records failed processing. " in str(e.value) +def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, sqs_fifo_record_handler): + # GIVEN + first_record = SQSRecord(sqs_event_factory("success")) + second_record = SQSRecord(sqs_event_factory("success")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} + + processor = SQSFifoPartialProcessor() + + @batch_processor(record_handler=sqs_fifo_record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN + assert result["batchItemFailures"] == [] + + +def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, sqs_fifo_record_handler): + # GIVEN + first_record = SQSRecord(sqs_event_factory("success")) + second_record = SQSRecord(sqs_event_factory("fail")) + # this would normally suceed, but since it's a FIFO queue, it will be marked as failure + third_record = SQSRecord(sqs_event_factory("success")) + event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]} + + processor = SQSFifoPartialProcessor() + + @batch_processor(record_handler=sqs_fifo_record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN + assert len(result["batchItemFailures"]) == 2 + assert result["batchItemFailures"][0]["itemIdentifier"] == second_record.message_id + assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id + + def test_async_batch_processor_middleware_success_only(sqs_event_factory, async_record_handler): # GIVEN first_record = SQSRecord(sqs_event_factory("success")) From aad65f01968b6eb1b91baef1e913dc0a8b651800 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Fri, 17 Feb 2023 15:18:20 +0100 Subject: [PATCH 2/6] fix: addressed review comments --- .../utilities/batch/__init__.py | 2 + aws_lambda_powertools/utilities/batch/base.py | 20 +------- .../batch/sqs_fifo_partial_processor.py | 51 ++++++++----------- .../utilities/batch/types.py | 24 +++++++++ 4 files changed, 48 insertions(+), 49 deletions(-) create mode 100644 aws_lambda_powertools/utilities/batch/types.py diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 9ad4ca43d38..c7e8f0581d1 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -19,12 +19,14 @@ from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import ( SQSFifoPartialProcessor, ) +from aws_lambda_powertools.utilities.batch.types import BatchTypeModels __all__ = ( "BatchProcessor", "AsyncBatchProcessor", "BasePartialProcessor", "BasePartialBatchProcessor", + "BatchTypeModels", "ExceptionInfo", "EventType", "FailureResponse", diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 171858c6d11..3aea2b70fa4 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -19,7 +19,6 @@ List, Optional, Tuple, - Type, Union, overload, ) @@ -30,6 +29,7 @@ BatchProcessingError, ExceptionInfo, ) +from aws_lambda_powertools.utilities.batch.types import BatchTypeModels from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( DynamoDBRecord, ) @@ -48,24 +48,6 @@ class EventType(Enum): DynamoDBStreams = "DynamoDBStreams" -# -# type specifics -# -has_pydantic = "pydantic" in sys.modules - -# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses -# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation -if has_pydantic: - from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel - from aws_lambda_powertools.utilities.parser.models import ( - KinesisDataStreamRecord as KinesisDataStreamRecordModel, - ) - from aws_lambda_powertools.utilities.parser.models import SqsRecordModel - - BatchTypeModels = Optional[ - Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]] - ] - # When using processor with default arguments, records will carry EventSourceDataClassTypes # and depending on what EventType it's passed it'll correctly map to the right record # When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB and Kinesis diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 8e6f293da34..31827829857 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -1,18 +1,7 @@ -import sys -from typing import List, Optional, Tuple, Type +from typing import List, Optional, Tuple from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType -from aws_lambda_powertools.utilities.parser.models import SqsRecordModel - -# -# type specifics -# -has_pydantic = "pydantic" in sys.modules - -# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses -# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation -if has_pydantic: - BatchTypeModels = Optional[Type[SqsRecordModel]] +from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel class SQSFifoCircuitBreakerError(Exception): @@ -24,10 +13,9 @@ class SQSFifoCircuitBreakerError(Exception): class SQSFifoPartialProcessor(BatchProcessor): - """Specialized BatchProcessor subclass that handles FIFO SQS batch records. + """Process native partial responses from SQS FIFO queues. - As soon as the processing of the first record fails, the remaining records - are marked as failed without processing, and returned as native partial responses. + Stops processing records when the first record fails. The remaining records are reported as failed items. Example _______ @@ -63,38 +51,41 @@ def lambda_handler(event, context: LambdaContext): ``` """ - circuitBreakerError = SQSFifoCircuitBreakerError("A previous record failed processing.") + circuit_breaker_exc = ( + SQSFifoCircuitBreakerError, + SQSFifoCircuitBreakerError("A previous record failed processing"), + None, + ) - def __init__(self, model: Optional["BatchTypeModels"] = None): + def __init__(self, model: Optional["BatchSqsTypeModel"] = None): super().__init__(EventType.SQS, model) def process(self) -> List[Tuple]: + """ + Call instance's handler for each record. When the first failed message is detected, + the process is short-circuited, and the remaining messages are reported as failed items. + """ result: List[Tuple] = [] for i, record in enumerate(self.records): - """ - If we have failed messages, it means that the last message failed. - We then short circuit the process, failing the remaining messages - """ + # If we have failed messages, it means that the last message failed. + # We then short circuit the process, failing the remaining messages if self.fail_messages: return self._short_circuit_processing(i, result) - """ - Otherwise, process the message normally - """ + # Otherwise, process the message normally result.append(self._process_record(record)) return result def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]: + """ + Starting from the first failure index, fail all the remaining messages, and append them to the result list. + """ remaining_records = self.records[first_failure_index:] for remaining_record in remaining_records: data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model) - result.append( - self.failure_handler( - record=data, exception=(type(self.circuitBreakerError), self.circuitBreakerError, None) - ) - ) + result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc)) return result async def _async_process_record(self, record: dict): diff --git a/aws_lambda_powertools/utilities/batch/types.py b/aws_lambda_powertools/utilities/batch/types.py new file mode 100644 index 00000000000..d50a16f2a8f --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/types.py @@ -0,0 +1,24 @@ +# +# type specifics +# +import sys +from typing import Optional, Type, Union + +has_pydantic = "pydantic" in sys.modules + +# For IntelliSense and Mypy to work, we need to account for possible SQS subclasses +# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation +if False and has_pydantic: + from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel + from aws_lambda_powertools.utilities.parser.models import ( + KinesisDataStreamRecord as KinesisDataStreamRecordModel, + ) + from aws_lambda_powertools.utilities.parser.models import SqsRecordModel + + BatchTypeModels = Optional[ + Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]] + ] + BatchSqsTypeModel = Optional[Type[SqsRecordModel]] +else: + BatchTypeModels = Optional # type: ignore + BatchSqsTypeModel = Optional # type: ignore From a1367e72e797813c88676b741180ef33fc6e9334 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Fri, 17 Feb 2023 15:19:15 +0100 Subject: [PATCH 3/6] fix: typo --- aws_lambda_powertools/utilities/batch/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/types.py b/aws_lambda_powertools/utilities/batch/types.py index d50a16f2a8f..39c93ca224a 100644 --- a/aws_lambda_powertools/utilities/batch/types.py +++ b/aws_lambda_powertools/utilities/batch/types.py @@ -8,7 +8,7 @@ # For IntelliSense and Mypy to work, we need to account for possible SQS subclasses # We need them as subclasses as we must access their message ID or sequence number metadata via dot notation -if False and has_pydantic: +if has_pydantic: from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel from aws_lambda_powertools.utilities.parser.models import ( KinesisDataStreamRecord as KinesisDataStreamRecordModel, From d7be0a46de6e2a9fc1e96d25adc07a7f4c35b9d6 Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Mon, 20 Feb 2023 11:31:49 +0100 Subject: [PATCH 4/6] fix: address review --- .../utilities/batch/__init__.py | 4 +-- .../batch/sqs_fifo_partial_processor.py | 6 ++--- .../utilities/batch/types.py | 4 +-- docs/utilities/batch.md | 4 +-- .../src/sqs_fifo_batch_processor.py | 4 +-- ...qs_fifo_batch_processor_context_manager.py | 4 +-- tests/functional/test_utilities_batch.py | 27 ++++++------------- 7 files changed, 21 insertions(+), 32 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index c7e8f0581d1..0e2637cc358 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -17,7 +17,7 @@ ) from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import ( - SQSFifoPartialProcessor, + SqsFifoPartialProcessor, ) from aws_lambda_powertools.utilities.batch.types import BatchTypeModels @@ -31,7 +31,7 @@ "EventType", "FailureResponse", "SuccessResponse", - "SQSFifoPartialProcessor", + "SqsFifoPartialProcessor", "batch_processor", "async_batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 31827829857..d48749a137e 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -12,7 +12,7 @@ class SQSFifoCircuitBreakerError(Exception): pass -class SQSFifoPartialProcessor(BatchProcessor): +class SqsFifoPartialProcessor(BatchProcessor): """Process native partial responses from SQS FIFO queues. Stops processing records when the first record fails. The remaining records are reported as failed items. @@ -26,12 +26,12 @@ class SQSFifoPartialProcessor(BatchProcessor): import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext - processor = SQSFifoPartialProcessor() + processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() diff --git a/aws_lambda_powertools/utilities/batch/types.py b/aws_lambda_powertools/utilities/batch/types.py index 39c93ca224a..1fc5aba4fc4 100644 --- a/aws_lambda_powertools/utilities/batch/types.py +++ b/aws_lambda_powertools/utilities/batch/types.py @@ -20,5 +20,5 @@ ] BatchSqsTypeModel = Optional[Type[SqsRecordModel]] else: - BatchTypeModels = Optional # type: ignore - BatchSqsTypeModel = Optional # type: ignore + BatchTypeModels = "BatchTypeModels" # type: ignore + BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 3f1e255aaa2..b9368e741b7 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -349,8 +349,8 @@ Processing batches from SQS works in four stages: #### FIFO queues -If you're using this feature with a FIFO queue, you should use the `SQSFifoPartialProcessor` class instead. We will -stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. +When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, +We will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. This helps preserve the ordering of messages in your queue. === "As a decorator" diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor.py b/examples/batch_processing/src/sqs_fifo_batch_processor.py index d1d2c587568..a5fe9f23235 100644 --- a/examples/batch_processing/src/sqs_fifo_batch_processor.py +++ b/examples/batch_processing/src/sqs_fifo_batch_processor.py @@ -1,12 +1,12 @@ from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import ( - SQSFifoPartialProcessor, + SqsFifoPartialProcessor, batch_processor, ) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext -processor = SQSFifoPartialProcessor() +processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py b/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py index 80b45737cb9..45759b2a585 100644 --- a/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py +++ b/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py @@ -1,9 +1,9 @@ from aws_lambda_powertools import Logger, Tracer -from aws_lambda_powertools.utilities.batch import SQSFifoPartialProcessor +from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext -processor = SQSFifoPartialProcessor() +processor = SqsFifoPartialProcessor() tracer = Tracer() logger = Logger() diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 1ea5339ffc3..f93ecf855a6 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -10,7 +10,7 @@ AsyncBatchProcessor, BatchProcessor, EventType, - SQSFifoPartialProcessor, + SqsFifoPartialProcessor, async_batch_processor, batch_processor, ) @@ -119,17 +119,6 @@ def handler(record): return handler -@pytest.fixture(scope="module") -def sqs_fifo_record_handler() -> Callable: - def handler(record): - body = record["body"] - if "fail" in body: - raise Exception("Failed to process record.") - return body - - return handler - - @pytest.fixture(scope="module") def async_record_handler() -> Callable[..., Awaitable[Any]]: async def handler(record): @@ -667,15 +656,15 @@ def lambda_handler(event, context): assert "All records failed processing. " in str(e.value) -def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, sqs_fifo_record_handler): +def test_sqs_fifo_batch_processor_middleware_success_only(sqs_event_factory, record_handler): # GIVEN first_record = SQSRecord(sqs_event_factory("success")) second_record = SQSRecord(sqs_event_factory("success")) event = {"Records": [first_record.raw_event, second_record.raw_event]} - processor = SQSFifoPartialProcessor() + processor = SqsFifoPartialProcessor() - @batch_processor(record_handler=sqs_fifo_record_handler, processor=processor) + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): return processor.response() @@ -686,17 +675,17 @@ def lambda_handler(event, context): assert result["batchItemFailures"] == [] -def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, sqs_fifo_record_handler): +def test_sqs_fifo_batch_processor_middleware_with_failure(sqs_event_factory, record_handler): # GIVEN first_record = SQSRecord(sqs_event_factory("success")) second_record = SQSRecord(sqs_event_factory("fail")) - # this would normally suceed, but since it's a FIFO queue, it will be marked as failure + # this would normally succeed, but since it's a FIFO queue, it will be marked as failure third_record = SQSRecord(sqs_event_factory("success")) event = {"Records": [first_record.raw_event, second_record.raw_event, third_record.raw_event]} - processor = SQSFifoPartialProcessor() + processor = SqsFifoPartialProcessor() - @batch_processor(record_handler=sqs_fifo_record_handler, processor=processor) + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): return processor.response() From 0070643c5efd6d323e2ab1b57c27f2a94c26b5cd Mon Sep 17 00:00:00 2001 From: Ruben Fonseca Date: Mon, 20 Feb 2023 11:32:59 +0100 Subject: [PATCH 5/6] Update tests/functional/test_utilities_batch.py Co-authored-by: Heitor Lessa Signed-off-by: Ruben Fonseca --- tests/functional/test_utilities_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index f93ecf855a6..c98d59a7042 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -42,7 +42,7 @@ def sqs_event_factory() -> Callable: def factory(body: str): return { - "messageId": str(uuid.uuid4()), + "messageId": f"{uuid.uuid4()}", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", "body": body, "attributes": { From 1f8e621913c4c50c4360c55490fba1521141255d Mon Sep 17 00:00:00 2001 From: Heitor Lessa Date: Mon, 20 Feb 2023 11:51:41 +0100 Subject: [PATCH 6/6] docs: fix typo Signed-off-by: Heitor Lessa --- docs/utilities/batch.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index b9368e741b7..0f899673c2e 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -349,8 +349,7 @@ Processing batches from SQS works in four stages: #### FIFO queues -When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, -We will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. +When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. This helps preserve the ordering of messages in your queue. === "As a decorator"