diff --git a/doc/services.rst b/doc/services.rst index 1ac3738..405ccf4 100644 --- a/doc/services.rst +++ b/doc/services.rst @@ -8,12 +8,14 @@ This module provides classes to build steps that integrate with Amazon DynamoDB, - `Amazon DynamoDB <#amazon-dynamodb>`__ +- `Amazon EMR <#amazon-emr>`__ + +- `Amazon EventBridge <#amazon-eventbridge>`__ + - `Amazon SNS <#amazon-sns>`__ - `Amazon SQS <#amazon-sqs>`__ -- `Amazon EMR <#amazon-emr>`__ - Amazon DynamoDB ---------------- @@ -25,14 +27,6 @@ Amazon DynamoDB .. autoclass:: stepfunctions.steps.service.DynamoDBUpdateItemStep -Amazon SNS ------------ -.. autoclass:: stepfunctions.steps.service.SnsPublishStep - -Amazon SQS ------------ -.. autoclass:: stepfunctions.steps.service.SqsSendMessageStep - Amazon EMR ----------- .. autoclass:: stepfunctions.steps.service.EmrCreateClusterStep @@ -48,3 +42,15 @@ Amazon EMR .. autoclass:: stepfunctions.steps.service.EmrModifyInstanceFleetByNameStep .. autoclass:: stepfunctions.steps.service.EmrModifyInstanceGroupByNameStep + +Amazon EventBridge +----------- +.. autoclass:: stepfunctions.steps.service.EventBridgePutEventsStep + +Amazon SNS +----------- +.. autoclass:: stepfunctions.steps.service.SnsPublishStep + +Amazon SQS +----------- +.. autoclass:: stepfunctions.steps.service.SqsSendMessageStep diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 92598f1..b27d411 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -19,6 +19,6 @@ from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep, TuningStep, ProcessingStep from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep - +from stepfunctions.steps.service import EventBridgePutEventsStep +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 6bf155f..5c32a88 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -18,9 +18,11 @@ from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn DYNAMODB_SERVICE_NAME = "dynamodb" +ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce" +EVENTBRIDGE_SERVICE_NAME = "events" SNS_SERVICE_NAME = "sns" SQS_SERVICE_NAME = "sqs" -ELASTICMAPREDUCE_SERVICE_NAME = "elasticmapreduce" + class DynamoDBApi(Enum): @@ -48,6 +50,10 @@ class ElasticMapReduceApi(Enum): ModifyInstanceGroupByName = "modifyInstanceGroupByName" +class EventBridgeApi(Enum): + PutEvents = "putEvents" + + class DynamoDBGetItemStep(Task): """ Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. @@ -77,6 +83,46 @@ def __init__(self, state_id, **kwargs): super(DynamoDBGetItemStep, self).__init__(state_id, **kwargs) +class EventBridgePutEventsStep(Task): + + """ + Creates a Task to send custom events to Amazon EventBridge. See`Call EventBridge with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_callback=False, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + + if wait_for_callback: + """ + Example resource arn: arn:aws:states:::events:putEvents.waitForTaskToken + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EVENTBRIDGE_SERVICE_NAME, + EventBridgeApi.PutEvents, + IntegrationPattern.WaitForTaskToken) + else: + """ + Example resource arn: arn:aws:states:::events:putEvents + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(EVENTBRIDGE_SERVICE_NAME, + EventBridgeApi.PutEvents) + + super(EventBridgePutEventsStep, self).__init__(state_id, **kwargs) + + class DynamoDBPutItemStep(Task): """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 6576aaf..53039c7 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -19,6 +19,7 @@ from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep +from stepfunctions.steps.service import EventBridgePutEventsStep @patch.object(boto3.session.Session, 'region_name', 'us-east-1') @@ -98,6 +99,70 @@ def test_sqs_send_message_step_creation(): 'End': True } +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_eventbridge_put_events_step_creation(): + step = EventBridgePutEventsStep('Send to EventBridge', parameters={ + "Entries": [ + { + "Detail": { + "Message": "MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }) + + assert step.to_dict() == { + "Type": "Task", + "Resource": 'arn:aws:states:::events:putEvents', + "Parameters": { + "Entries": [ + { + "Detail": { + "Message": "MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }, + "End": True + } + + step = EventBridgePutEventsStep('Send to EventBridge', wait_for_callback=True, parameters={ + "Entries": [ + { + "Detail": { + "Message.$": "$.MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }) + + assert step.to_dict() == { + "Type": "Task", + "Resource": "arn:aws:states:::events:putEvents.waitForTaskToken", + "Parameters": { + "Entries": [ + { + "Detail": { + "Message.$": "$.MyMessage" + }, + "DetailType": "MyDetailType", + "EventBusName": "MyEventBus", + "Source": "my.source" + } + ] + }, + "End": True + } + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_get_item_step_creation():