Skip to content

Commit ee7dfab

Browse files
Daniel Yoolialln
Daniel Yoo
authored andcommitted
Add support for dynamic timeout and heartbeat in Task state
1 parent 5da8f76 commit ee7dfab

File tree

4 files changed

+35
-11
lines changed

4 files changed

+35
-11
lines changed

src/stepfunctions/steps/compute.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
2828
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
2929
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
3030
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
31+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
3132
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
33+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
3234
comment (str, optional): Human-readable comment or description. (default: None)
3335
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
3436
parameters (dict, optional): The value of this field becomes the effective input for the state.
@@ -55,7 +57,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
5557
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
5658
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the glue job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the glue job and proceed to the next step. (default: True)
5759
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
60+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
5861
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
62+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
5963
comment (str, optional): Human-readable comment or description. (default: None)
6064
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
6165
parameters (dict, optional): The value of this field becomes the effective input for the state.
@@ -82,7 +86,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
8286
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
8387
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the batch job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the batch job and proceed to the next step. (default: True)
8488
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
89+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
8590
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
91+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
8692
comment (str, optional): Human-readable comment or description. (default: None)
8793
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
8894
parameters (dict, optional): The value of this field becomes the effective input for the state.
@@ -109,7 +115,9 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
109115
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
110116
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True)
111117
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
118+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
112119
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
120+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
113121
comment (str, optional): Human-readable comment or description. (default: None)
114122
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
115123
parameters (dict, optional): The value of this field becomes the effective input for the state.

src/stepfunctions/steps/fields.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
#
77
# http://www.apache.org/licenses/LICENSE-2.0
88
#
9-
# or in the "license" file accompanying this file. This file is distributed
10-
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11-
# express or implied. See the License for the specific language governing
9+
# or in the "license" file accompanying this file. This file is distributed
10+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
# express or implied. See the License for the specific language governing
1212
# permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

@@ -53,9 +53,12 @@ class Field(Enum):
5353
MaxConcurrency = 'max_concurrency'
5454

5555
# Task state fields
56-
Resource = 'resource'
57-
TimeoutSeconds = 'timeout_seconds'
58-
HeartbeatSeconds = 'heartbeat_seconds'
56+
Resource = 'resource'
57+
TimeoutSeconds = 'timeout_seconds'
58+
TimeoutSecondsPath = 'timeout_seconds_path'
59+
HeartbeatSeconds = 'heartbeat_seconds'
60+
HeartbeatSecondsPath = 'heartbeat_seconds_path'
61+
5962

6063
# Retry and catch fields
6164
ErrorEquals = 'error_equals'

src/stepfunctions/steps/service.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
#
77
# http://www.apache.org/licenses/LICENSE-2.0
88
#
9-
# or in the "license" file accompanying this file. This file is distributed
10-
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11-
# express or implied. See the License for the specific language governing
9+
# or in the "license" file accompanying this file. This file is distributed
10+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
# express or implied. See the License for the specific language governing
1212
# permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

@@ -107,7 +107,9 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
107107
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
108108
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
109109
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
110+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
110111
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
112+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
111113
comment (str, optional): Human-readable comment or description. (default: None)
112114
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
113115
parameters (dict, optional): The value of this field becomes the effective input for the state.
@@ -118,7 +120,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
118120
kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish.waitForTaskToken'
119121
else:
120122
kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish'
121-
123+
122124
super(SnsPublishStep, self).__init__(state_id, **kwargs)
123125

124126

@@ -134,7 +136,9 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
134136
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
135137
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
136138
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
139+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
137140
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
141+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
138142
comment (str, optional): Human-readable comment or description. (default: None)
139143
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
140144
parameters (dict, optional): The value of this field becomes the effective input for the state.
@@ -145,7 +149,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
145149
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage.waitForTaskToken'
146150
else:
147151
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage'
148-
152+
149153
super(SqsSendMessageStep, self).__init__(state_id, **kwargs)
150154

151155

src/stepfunctions/steps/states.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,14 +583,21 @@ def __init__(self, state_id, **kwargs):
583583
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
584584
resource (str): A URI that uniquely identifies the specific task to execute. The States language does not constrain the URI scheme nor any other part of the URI.
585585
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
586+
timeout_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for timeout seconds duration.
586587
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
588+
heartbeat_seconds_path (str, optional): Path applied to the state's input to select the integer to be used for heartbeat seconds duration.
587589
comment (str, optional): Human-readable comment or description. (default: None)
588590
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
589591
parameters (dict, optional): The value of this field becomes the effective input for the state.
590592
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
591593
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
592594
"""
593595
super(Task, self).__init__(state_id, 'Task', **kwargs)
596+
if self.timeout_seconds is not None and self.timeout_seconds_path is not None:
597+
raise ValueError("The Task state can contain at most one of 'timeout_seconds' or 'timeout_seconds_path'.")
598+
599+
if self.heartbeat_seconds is not None and self.heartbeat_seconds_path is not None:
600+
raise ValueError("The Task state can contain at most one of 'heartbeat_seconds' or 'heartbeat_seconds_path'.")
594601

595602
def allowed_fields(self):
596603
return [
@@ -600,7 +607,9 @@ def allowed_fields(self):
600607
Field.Parameters,
601608
Field.ResultPath,
602609
Field.TimeoutSeconds,
610+
Field.TimeoutSecondsPath,
603611
Field.HeartbeatSeconds,
612+
Field.HeartbeatSecondsPath,
604613
Field.Resource,
605614
Field.Retry,
606615
Field.Catch

0 commit comments

Comments
 (0)