|
| 1 | +import base64 |
| 2 | +import json |
| 3 | + |
1 | 4 | from yarl import URL |
2 | 5 |
|
3 | 6 | from cratedb_toolkit.io.kinesis.adapter import KinesisStreamAdapter |
4 | | -from tests.io.test_awslambda import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis |
5 | 7 |
|
6 | 8 |
|
7 | 9 | def main(): |
8 | | - ka = KinesisStreamAdapter(URL("kinesis://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1")) |
9 | | - ka.produce(wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED)) |
10 | | - ka.produce(wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED)) |
| 10 | + |
| 11 | + # Address of the Kinesis stream. |
| 12 | + kinesis_url = URL("kinesis://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1") |
| 13 | + |
| 14 | + # DynamoDB CDC data payload example. |
| 15 | + data = { |
| 16 | + "awsRegion": "us-east-1", |
| 17 | + "eventID": "b581c2dc-9d97-44ed-94f7-cb77e4fdb740", |
| 18 | + "eventName": "INSERT", |
| 19 | + "userIdentity": None, |
| 20 | + "recordFormat": "application/json", |
| 21 | + "tableName": "testdrive", |
| 22 | + "dynamodb": { |
| 23 | + "ApproximateCreationDateTime": 1720800199717446, |
| 24 | + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, |
| 25 | + "NewImage": { |
| 26 | + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, |
| 27 | + "data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}}, |
| 28 | + "meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}}, |
| 29 | + }, |
| 30 | + "SizeBytes": 156, |
| 31 | + "ApproximateCreationDateTimePrecision": "MICROSECOND", |
| 32 | + }, |
| 33 | + "eventSource": "aws:dynamodb", |
| 34 | + } |
| 35 | + |
| 36 | + # Actual payload in the form how DynamoDB CDC events are published to Kinesis streams. |
| 37 | + payload = { |
| 38 | + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", |
| 39 | + "kinesis": { |
| 40 | + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", |
| 41 | + "data": base64.b64encode(json.dumps(data).encode("utf-8")).decode(), |
| 42 | + }, |
| 43 | + } |
| 44 | + |
| 45 | + kinesis = KinesisStreamAdapter(kinesis_url) |
| 46 | + kinesis.produce(payload) |
11 | 47 |
|
12 | 48 |
|
13 | 49 | if __name__ == "__main__": |
|
0 commit comments