From 3c196f062be8d384f30aa1ba7104caa2de5fb768 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Fri, 19 May 2023 01:49:48 +0100 Subject: [PATCH 1/4] feat(kinesis-sqs): adding support to sqs wrapped in kinesis --- .../utilities/parser/models/__init__.py | 3 ++ .../parser/models/kinesis_firehose_sqs.py | 29 +++++++++++++++++++ tests/events/kinesisFirehoseSQSEvent.json | 12 ++++++++ .../parser/test_kinesis_firehose.py | 24 +++++++++++++++ 4 files changed, 68 insertions(+) create mode 100644 aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py create mode 100644 tests/events/kinesisFirehoseSQSEvent.json diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index 5f7a8a6b550..aae577e43c4 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -42,6 +42,7 @@ KinesisFirehoseRecord, KinesisFirehoseRecordMetadata, ) +from .kinesis_firehose_sqs import KinesisFirehoseSQSModel, KinesisFirehoseSQSRecord from .lambda_function_url import LambdaFunctionUrlModel from .s3 import ( S3EventNotificationEventBridgeDetailModel, @@ -144,4 +145,6 @@ "KafkaRecordModel", "KafkaMskEventModel", "KafkaBaseEventModel", + "KinesisFirehoseSQSModel", + "KinesisFirehoseSQSRecord", ] diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py new file mode 100644 index 00000000000..57fc5610866 --- /dev/null +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py @@ -0,0 +1,29 @@ +import json +from typing import List, Optional + +from pydantic import BaseModel, PositiveInt, validator + +from aws_lambda_powertools.shared.functions import base64_decode +from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata + +from .sqs import SqsRecordModel + + +class KinesisFirehoseSQSRecord(BaseModel): + data: SqsRecordModel + recordId: str + approximateArrivalTimestamp: PositiveInt + kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] + + @validator("data", pre=True, allow_reuse=True) + def data_base64_decode(cls, value): + # Firehose payload is encoded twice + return json.loads(base64_decode(base64_decode(value))) + + +class KinesisFirehoseSQSModel(BaseModel): + invocationId: str + deliveryStreamArn: str + region: str + sourceKinesisStreamArn: Optional[str] + records: List[KinesisFirehoseSQSRecord] diff --git a/tests/events/kinesisFirehoseSQSEvent.json b/tests/events/kinesisFirehoseSQSEvent.json new file mode 100644 index 00000000000..07965164791 --- /dev/null +++ b/tests/events/kinesisFirehoseSQSEvent.json @@ -0,0 +1,12 @@ +{ + "invocationId": "556b67a3-48fc-4385-af49-e133aade9cb9", + "deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE", + "region": "us-east-1", + "records": [ + { + "recordId": "49640890555757999697367295315052682717311106012636250114000000", + "approximateArrivalTimestamp": 1684454259637, + "data": "ZXdvZ0lDQWdJQ0FpYldWemMyRm5aVWxrSWpvZ0lqQTFPV1l6Tm1JMExUZzNZVE10TkRSaFlpMDRNMlF5TFRZMk1UazNOVGd6TUdFM1pDSXNDaUFnSUNBZ0lDSnlaV05sYVhCMFNHRnVaR3hsSWpvZ0lrRlJSVUozU201TGVYSklhV2RWVFZwcU5uSlphV2REWjNoc1lWTXpVMHg1TUdFdUxpNGlMQW9nSUNBZ0lDQWlZbTlrZVNJNklDSlVaWE4wSUcxbGMzTmhaMlV1SWl3S0lDQWdJQ0FnSW1GMGRISnBZblYwWlhNaU9pQjdDaUFnSUNBZ0lDQWdJa0Z3Y0hKdmVHbHRZWFJsVW1WalpXbDJaVU52ZFc1MElqb2dJakVpTEFvZ0lDQWdJQ0FnSUNKVFpXNTBWR2x0WlhOMFlXMXdJam9nSWpFMU5EVXdPREkyTkRreE9ETWlMQW9nSUNBZ0lDQWdJQ0pUWlc1a1pYSkpaQ0k2SUNKQlNVUkJTVVZPVVZwS1QweFBNak5aVmtvMFZrOGlMQW9nSUNBZ0lDQWdJQ0pCY0hCeWIzaHBiV0YwWlVacGNuTjBVbVZqWldsMlpWUnBiV1Z6ZEdGdGNDSTZJQ0l4TlRRMU1EZ3lOalE1TVRnMUlnb2dJQ0FnSUNCOUxBb2dJQ0FnSUNBaWJXVnpjMkZuWlVGMGRISnBZblYwWlhNaU9pQjdDaUFnSUNBZ0lDQWdJblJsYzNSQmRIUnlJam9nZXdvZ0lDQWdJQ0FnSUNBZ0luTjBjbWx1WjFaaGJIVmxJam9nSWpFd01DSXNDaUFnSUNBZ0lDQWdJQ0FpWW1sdVlYSjVWbUZzZFdVaU9pQWlZbUZ6WlRZMFUzUnlJaXdLSUNBZ0lDQWdJQ0FnSUNKa1lYUmhWSGx3WlNJNklDSk9kVzFpWlhJaUNpQWdJQ0FnSUNBZ2ZRb2dJQ0FnSUNCOUxBb2dJQ0FnSUNBaWJXUTFUMlpDYjJSNUlqb2dJbVUwWlRZNFptSTNZbVF3WlRZNU4yRXdZV1U0WmpGaVlqTTBNamcwTm1Jeklpd0tJQ0FnSUNBZ0ltVjJaVzUwVTI5MWNtTmxJam9nSW1GM2N6cHpjWE1pTEFvZ0lDQWdJQ0FpWlhabGJuUlRiM1Z5WTJWQlVrNGlPaUFpWVhKdU9tRjNjenB6Y1hNNmRYTXRaV0Z6ZEMweU9qRXlNelExTmpjNE9UQXhNanB0ZVMxeGRXVjFaU0lzQ2lBZ0lDQWdJQ0poZDNOU1pXZHBiMjRpT2lBaWRYTXRaV0Z6ZEMweUlnb2dJQ0FnZlFvPQ==" + } + ] +} diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py index 59bbd2f4e18..87297114b18 100644 --- a/tests/functional/parser/test_kinesis_firehose.py +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -11,6 +11,8 @@ KinesisFirehoseModel, KinesisFirehoseRecord, KinesisFirehoseRecordMetadata, + KinesisFirehoseSQSModel, + KinesisFirehoseSQSRecord, ) from aws_lambda_powertools.utilities.typing import LambdaContext from tests.functional.parser.schemas import MyKinesisFirehoseBusiness @@ -77,6 +79,28 @@ def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContex assert record_02.data == b'{"Hello": "World"}' +@event_parser(model=KinesisFirehoseSQSModel) +def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSQSModel, _: LambdaContext): + assert event.region == "us-east-1" + assert event.invocationId == "556b67a3-48fc-4385-af49-e133aade9cb9" + assert event.deliveryStreamArn == "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE" + + records = list(event.records) + assert len(records) == 1 + + record_01: KinesisFirehoseSQSRecord = records[0] + assert record_01.data.messageId == "059f36b4-87a3-44ab-83d2-661975830a7d" + assert record_01.data.receiptHandle == "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..." + assert record_01.data.body == "Test message." + assert record_01.data.attributes.ApproximateReceiveCount == "1" + assert record_01.data.attributes.SenderId == "AIDAIENQZJOLO23YVJ4VO" + + +def test_firehose_sqs_wrapped_message_event(): + event_dict = load_event("kinesisFirehoseSQSEvent.json") + handle_firehose_sqs_wrapped_message(event_dict, LambdaContext()) + + def test_firehose_trigger_event(): event_dict = load_event("kinesisFirehoseKinesisEvent.json") event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class From f14c81cd09124b658d0493ccea1efd1d04a61ef2 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Fri, 19 May 2023 01:58:17 +0100 Subject: [PATCH 2/4] feat(kinesis-sqs): renaming to standard names --- .../utilities/parser/models/__init__.py | 6 +++--- .../utilities/parser/models/kinesis_firehose_sqs.py | 6 +++--- docs/utilities/parser.md | 1 + tests/functional/parser/test_kinesis_firehose.py | 10 +++++----- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/models/__init__.py b/aws_lambda_powertools/utilities/parser/models/__init__.py index aae577e43c4..c2385b7bf14 100644 --- a/aws_lambda_powertools/utilities/parser/models/__init__.py +++ b/aws_lambda_powertools/utilities/parser/models/__init__.py @@ -42,7 +42,7 @@ KinesisFirehoseRecord, KinesisFirehoseRecordMetadata, ) -from .kinesis_firehose_sqs import KinesisFirehoseSQSModel, KinesisFirehoseSQSRecord +from .kinesis_firehose_sqs import KinesisFirehoseSqsModel, KinesisFirehoseSqsRecord from .lambda_function_url import LambdaFunctionUrlModel from .s3 import ( S3EventNotificationEventBridgeDetailModel, @@ -145,6 +145,6 @@ "KafkaRecordModel", "KafkaMskEventModel", "KafkaBaseEventModel", - "KinesisFirehoseSQSModel", - "KinesisFirehoseSQSRecord", + "KinesisFirehoseSqsModel", + "KinesisFirehoseSqsRecord", ] diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py index 57fc5610866..3133e5d5c28 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py @@ -9,7 +9,7 @@ from .sqs import SqsRecordModel -class KinesisFirehoseSQSRecord(BaseModel): +class KinesisFirehoseSqsRecord(BaseModel): data: SqsRecordModel recordId: str approximateArrivalTimestamp: PositiveInt @@ -21,9 +21,9 @@ def data_base64_decode(cls, value): return json.loads(base64_decode(base64_decode(value))) -class KinesisFirehoseSQSModel(BaseModel): +class KinesisFirehoseSqsModel(BaseModel): invocationId: str deliveryStreamArn: str region: str sourceKinesisStreamArn: Optional[str] - records: List[KinesisFirehoseSQSRecord] + records: List[KinesisFirehoseSqsRecord] diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index 38e12c0792d..6607e7b07b0 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -168,6 +168,7 @@ Parser comes with the following built-in models: | **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload | | **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams | | **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose | +| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records | | **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload | | **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. | | **S3Model** | Lambda Event Source payload for Amazon S3 | diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py index 87297114b18..48e68fca5e8 100644 --- a/tests/functional/parser/test_kinesis_firehose.py +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -11,8 +11,8 @@ KinesisFirehoseModel, KinesisFirehoseRecord, KinesisFirehoseRecordMetadata, - KinesisFirehoseSQSModel, - KinesisFirehoseSQSRecord, + KinesisFirehoseSqsModel, + KinesisFirehoseSqsRecord, ) from aws_lambda_powertools.utilities.typing import LambdaContext from tests.functional.parser.schemas import MyKinesisFirehoseBusiness @@ -79,8 +79,8 @@ def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContex assert record_02.data == b'{"Hello": "World"}' -@event_parser(model=KinesisFirehoseSQSModel) -def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSQSModel, _: LambdaContext): +@event_parser(model=KinesisFirehoseSqsModel) +def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSqsModel, _: LambdaContext): assert event.region == "us-east-1" assert event.invocationId == "556b67a3-48fc-4385-af49-e133aade9cb9" assert event.deliveryStreamArn == "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE" @@ -88,7 +88,7 @@ def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSQSModel, _: Lambd records = list(event.records) assert len(records) == 1 - record_01: KinesisFirehoseSQSRecord = records[0] + record_01: KinesisFirehoseSqsRecord = records[0] assert record_01.data.messageId == "059f36b4-87a3-44ab-83d2-661975830a7d" assert record_01.data.receiptHandle == "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..." assert record_01.data.body == "Test message." From 12b795e6d109fb1ee32c0224a4052f17c09378e3 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 23 May 2023 19:09:01 +0100 Subject: [PATCH 3/4] feat(kinesis-sqs): fix decode string --- .../utilities/parser/models/kinesis_firehose_sqs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py index 3133e5d5c28..b649828853b 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis_firehose_sqs.py @@ -17,8 +17,8 @@ class KinesisFirehoseSqsRecord(BaseModel): @validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): - # Firehose payload is encoded twice - return json.loads(base64_decode(base64_decode(value))) + # Firehose payload is encoded + return json.loads(base64_decode(value)) class KinesisFirehoseSqsModel(BaseModel): From a9cd9ba983936d53ebcd7bfdd243a9438b056f49 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 23 May 2023 19:16:33 +0100 Subject: [PATCH 4/4] feat(kinesis-sqs): fix decode string --- tests/events/kinesisFirehoseSQSEvent.json | 10 +++++----- tests/functional/parser/test_kinesis_firehose.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/events/kinesisFirehoseSQSEvent.json b/tests/events/kinesisFirehoseSQSEvent.json index 07965164791..bea267c4206 100644 --- a/tests/events/kinesisFirehoseSQSEvent.json +++ b/tests/events/kinesisFirehoseSQSEvent.json @@ -3,10 +3,10 @@ "deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE", "region": "us-east-1", "records": [ - { - "recordId": "49640890555757999697367295315052682717311106012636250114000000", - "approximateArrivalTimestamp": 1684454259637, - "data": "ZXdvZ0lDQWdJQ0FpYldWemMyRm5aVWxrSWpvZ0lqQTFPV1l6Tm1JMExUZzNZVE10TkRSaFlpMDRNMlF5TFRZMk1UazNOVGd6TUdFM1pDSXNDaUFnSUNBZ0lDSnlaV05sYVhCMFNHRnVaR3hsSWpvZ0lrRlJSVUozU201TGVYSklhV2RWVFZwcU5uSlphV2REWjNoc1lWTXpVMHg1TUdFdUxpNGlMQW9nSUNBZ0lDQWlZbTlrZVNJNklDSlVaWE4wSUcxbGMzTmhaMlV1SWl3S0lDQWdJQ0FnSW1GMGRISnBZblYwWlhNaU9pQjdDaUFnSUNBZ0lDQWdJa0Z3Y0hKdmVHbHRZWFJsVW1WalpXbDJaVU52ZFc1MElqb2dJakVpTEFvZ0lDQWdJQ0FnSUNKVFpXNTBWR2x0WlhOMFlXMXdJam9nSWpFMU5EVXdPREkyTkRreE9ETWlMQW9nSUNBZ0lDQWdJQ0pUWlc1a1pYSkpaQ0k2SUNKQlNVUkJTVVZPVVZwS1QweFBNak5aVmtvMFZrOGlMQW9nSUNBZ0lDQWdJQ0pCY0hCeWIzaHBiV0YwWlVacGNuTjBVbVZqWldsMlpWUnBiV1Z6ZEdGdGNDSTZJQ0l4TlRRMU1EZ3lOalE1TVRnMUlnb2dJQ0FnSUNCOUxBb2dJQ0FnSUNBaWJXVnpjMkZuWlVGMGRISnBZblYwWlhNaU9pQjdDaUFnSUNBZ0lDQWdJblJsYzNSQmRIUnlJam9nZXdvZ0lDQWdJQ0FnSUNBZ0luTjBjbWx1WjFaaGJIVmxJam9nSWpFd01DSXNDaUFnSUNBZ0lDQWdJQ0FpWW1sdVlYSjVWbUZzZFdVaU9pQWlZbUZ6WlRZMFUzUnlJaXdLSUNBZ0lDQWdJQ0FnSUNKa1lYUmhWSGx3WlNJNklDSk9kVzFpWlhJaUNpQWdJQ0FnSUNBZ2ZRb2dJQ0FnSUNCOUxBb2dJQ0FnSUNBaWJXUTFUMlpDYjJSNUlqb2dJbVUwWlRZNFptSTNZbVF3WlRZNU4yRXdZV1U0WmpGaVlqTTBNamcwTm1Jeklpd0tJQ0FnSUNBZ0ltVjJaVzUwVTI5MWNtTmxJam9nSW1GM2N6cHpjWE1pTEFvZ0lDQWdJQ0FpWlhabGJuUlRiM1Z5WTJWQlVrNGlPaUFpWVhKdU9tRjNjenB6Y1hNNmRYTXRaV0Z6ZEMweU9qRXlNelExTmpjNE9UQXhNanB0ZVMxeGRXVjFaU0lzQ2lBZ0lDQWdJQ0poZDNOU1pXZHBiMjRpT2lBaWRYTXRaV0Z6ZEMweUlnb2dJQ0FnZlFvPQ==" - } + { + "recordId": "49640912821178817833517986466168945147170627572855734274000000", + "approximateArrivalTimestamp": 1684864917398, + "data": "eyJtZXNzYWdlSWQiOiI1YWI4MDdkNC01NjQ0LTRjNTUtOTdhMy00NzM5NjYzNWFjNzQiLCJyZWNlaXB0SGFuZGxlIjoiQVFFQndKbkt5ckhpZ1VNWmo2cllpZ0NneGxhUzNTTHkwYS4uLiIsImJvZHkiOiJUZXN0IG1lc3NhZ2UuIiwiYXR0cmlidXRlcyI6eyJBcHByb3hpbWF0ZVJlY2VpdmVDb3VudCI6IjEiLCJTZW50VGltZXN0YW1wIjoiMTY4NDg2NDg1MjQ5MSIsIlNlbmRlcklkIjoiQUlEQUlFTlFaSk9MTzIzWVZKNFZPIiwiQXBwcm94aW1hdGVGaXJzdFJlY2VpdmVUaW1lc3RhbXAiOiIxNjg0ODY0ODcyNDkxIn0sIm1lc3NhZ2VBdHRyaWJ1dGVzIjp7fSwibWQ1T2ZNZXNzYWdlQXR0cmlidXRlcyI6bnVsbCwibWQ1T2ZCb2R5IjoiYzhiNmJjNjBjOGI4YjNhOTA0ZTQ1YzFmYWJkZjUyM2QiLCJldmVudFNvdXJjZSI6ImF3czpzcXMiLCJldmVudFNvdXJjZUFSTiI6ImFybjphd3M6c3FzOnVzLWVhc3QtMToyMDA5ODQxMTIzODY6U05TIiwiYXdzUmVnaW9uIjoidXMtZWFzdC0xIn0K" + } ] } diff --git a/tests/functional/parser/test_kinesis_firehose.py b/tests/functional/parser/test_kinesis_firehose.py index 48e68fca5e8..c0b71f80540 100644 --- a/tests/functional/parser/test_kinesis_firehose.py +++ b/tests/functional/parser/test_kinesis_firehose.py @@ -89,7 +89,7 @@ def handle_firehose_sqs_wrapped_message(event: KinesisFirehoseSqsModel, _: Lambd assert len(records) == 1 record_01: KinesisFirehoseSqsRecord = records[0] - assert record_01.data.messageId == "059f36b4-87a3-44ab-83d2-661975830a7d" + assert record_01.data.messageId == "5ab807d4-5644-4c55-97a3-47396635ac74" assert record_01.data.receiptHandle == "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..." assert record_01.data.body == "Test message." assert record_01.data.attributes.ApproximateReceiveCount == "1"