Skip to content

Commit 20ae532

Browse files
author
Vaib Suri
committed
Add service integration for Amazon EMR
1 parent e1e974d commit 20ae532

File tree

3 files changed

+531
-0
lines changed

3 files changed

+531
-0
lines changed

src/stepfunctions/steps/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@
2020
from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep
2121
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
2222
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
23+
from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep
24+

src/stepfunctions/steps/service.py

+153
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,156 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
147147
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage'
148148

149149
super(SqsSendMessageStep, self).__init__(state_id, **kwargs)
150+
151+
152+
class EmrCreateClusterStep(Task):
153+
"""
154+
Creates a Task state to create and start running a cluster (job flow). See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
155+
"""
156+
157+
def __init__(self, state_id, wait_for_completion=True, **kwargs):
158+
"""
159+
Args:
160+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
161+
comment (str, optional): Human-readable comment or description. (default: None)
162+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
163+
parameters (dict, optional): The value of this field becomes the effective input for the state.
164+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
165+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
166+
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
167+
"""
168+
if wait_for_completion:
169+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:createCluster.sync'
170+
else:
171+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:createCluster'
172+
173+
super(EmrCreateClusterStep, self).__init__(state_id, **kwargs)
174+
175+
176+
class EmrTerminateClusterStep(Task):
177+
"""
178+
Creates a Task state to shut down a cluster (job flow). See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
179+
"""
180+
181+
def __init__(self, state_id, wait_for_completion=True, **kwargs):
182+
"""
183+
Args:
184+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
185+
comment (str, optional): Human-readable comment or description. (default: None)
186+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
187+
parameters (dict, optional): The value of this field becomes the effective input for the state.
188+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
189+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
190+
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
191+
"""
192+
if wait_for_completion:
193+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:terminateCluster.sync'
194+
else:
195+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:terminateCluster'
196+
197+
super(EmrTerminateClusterStep, self).__init__(state_id, **kwargs)
198+
199+
200+
class EmrAddStepStep(Task):
201+
"""
202+
Creates a Task state to add a new step to a running cluster. See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
203+
"""
204+
205+
def __init__(self, state_id, wait_for_completion=True, **kwargs):
206+
"""
207+
Args:
208+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
209+
comment (str, optional): Human-readable comment or description. (default: None)
210+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
211+
parameters (dict, optional): The value of this field becomes the effective input for the state.
212+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
213+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
214+
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
215+
"""
216+
if wait_for_completion:
217+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:addStep.sync'
218+
else:
219+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:addStep'
220+
221+
super(EmrAddStepStep, self).__init__(state_id, **kwargs)
222+
223+
224+
class EmrCancelStepStep(Task):
225+
"""
226+
Creates a Task state to cancel a pending step in a running cluster. See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
227+
"""
228+
229+
def __init__(self, state_id, **kwargs):
230+
"""
231+
Args:
232+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
233+
comment (str, optional): Human-readable comment or description. (default: None)
234+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
235+
parameters (dict, optional): The value of this field becomes the effective input for the state.
236+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
237+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
238+
"""
239+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:cancelStep'
240+
241+
super(EmrCancelStepStep, self).__init__(state_id, **kwargs)
242+
243+
244+
class EmrSetClusterTerminationProtectionStep(Task):
245+
"""
246+
Creates a Task state to lock a cluster (job flow) so the EC2 instances in the cluster cannot be terminated by user intervention, an API call, or a job-flow error. See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
247+
"""
248+
249+
def __init__(self, state_id, **kwargs):
250+
"""
251+
Args:
252+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
253+
comment (str, optional): Human-readable comment or description. (default: None)
254+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
255+
parameters (dict, optional): The value of this field becomes the effective input for the state.
256+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
257+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
258+
"""
259+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:setClusterTerminationProtection'
260+
261+
super(EmrSetClusterTerminationProtectionStep, self).__init__(state_id, **kwargs)
262+
263+
264+
class EmrModifyInstanceFleetByNameStep(Task):
265+
"""
266+
Creates a Task state to modify the target On-Demand and target Spot capacities for an instance fleet. See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
267+
"""
268+
269+
def __init__(self, state_id, **kwargs):
270+
"""
271+
Args:
272+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
273+
comment (str, optional): Human-readable comment or description. (default: None)
274+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
275+
parameters (dict, optional): The value of this field becomes the effective input for the state.
276+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
277+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
278+
"""
279+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName'
280+
281+
super(EmrModifyInstanceFleetByNameStep, self).__init__(state_id, **kwargs)
282+
283+
284+
class EmrModifyInstanceGroupByNameStep(Task):
285+
"""
286+
Creates a Task state to modify the number of nodes and configuration settings of an instance group. See `Call Amazon EMR with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html>`_ for more details.
287+
"""
288+
289+
def __init__(self, state_id, **kwargs):
290+
"""
291+
Args:
292+
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
293+
comment (str, optional): Human-readable comment or description. (default: None)
294+
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
295+
parameters (dict, optional): The value of this field becomes the effective input for the state.
296+
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
297+
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
298+
"""
299+
kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName'
300+
301+
super(EmrModifyInstanceGroupByNameStep, self).__init__(state_id, **kwargs)
302+

0 commit comments

Comments
 (0)