Skip to content

feat: Adds support for Placeholders in TrainingStep to set S3 location for InputDataConfig and OutputDataConfig #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions src/stepfunctions/steps/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from __future__ import absolute_import

from enum import Enum
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.inputs import Placeholder
from stepfunctions.steps.states import Task
from stepfunctions.steps.fields import Field
from stepfunctions.steps.utils import tags_dict_to_kv_list
Expand Down Expand Up @@ -43,15 +43,15 @@ class TrainingStep(Task):
Creates a Task State to execute a `SageMaker Training Job <https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTrainingJob.html>`_. The TrainingStep will also create a model by default, and the model shares the same name as the training job.
"""

def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, **kwargs):
def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=None, mini_batch_size=None, experiment_config=None, wait_for_completion=True, tags=None, output_path=None, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
estimator (sagemaker.estimator.EstimatorBase): The estimator for the training step. Can be a `BYO estimator, Framework estimator <https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms.html>`_ or `Amazon built-in algorithm estimator <https://docs.aws.amazon.com/sagemaker/latest/dg/algos.html>`_.
job_name (str or Placeholder): Specify a training job name, this is required for the training job to run. We recommend to use :py:class:`~stepfunctions.inputs.ExecutionInput` placeholder collection to pass the value dynamically in each execution.
data: Information about the training data. Please refer to the ``fit()`` method of the associated estimator, as this can take any of the following forms:

* (str) - The S3 location where training data is saved.
* (str or Placeholder) - The S3 location where training data is saved.
* (dict[str, str] or dict[str, sagemaker.inputs.TrainingInput]) - If using multiple
channels for training data, you can specify a dict mapping channel names to
strings or :func:`~sagemaker.inputs.TrainingInput` objects.
Expand All @@ -69,6 +69,13 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
experiment_config (dict, optional): Specify the experiment config for the training. (Default: None)
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait for the training job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the training job and proceed to the next step. (default: True)
tags (list[dict], optional): `List to tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
output_path (str or Placeholder, optional): S3 location for saving the training result (model
artifacts and output files) to propagate to estimator. If not specified, results are
stored to a default bucket. If the bucket with the specific name
does not exist, the estimator creates the bucket during the
:meth:`~sagemaker.estimator.EstimatorBase.fit` method execution.
file:// urls are used for local mode. For example: 'file://model/'
will save to the model folder in the current directory.
"""
self.estimator = estimator
self.job_name = job_name
Expand All @@ -88,6 +95,11 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non

kwargs[Field.Resource.value] = get_service_integration_arn(SAGEMAKER_SERVICE_NAME,
SageMakerApi.CreateTrainingJob)
# sagemaker.workflow.airflow.training_config does not accept Placeholder as input. Transform data placeholder
# to JSONpath to generate parameters.
is_data_placeholder = isinstance(data, Placeholder)
if is_data_placeholder:
data = data.to_jsonpath()

if isinstance(job_name, str):
parameters = training_config(estimator=estimator, inputs=data, job_name=job_name, mini_batch_size=mini_batch_size)
Expand All @@ -100,9 +112,18 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
if estimator.rules != None:
parameters['DebugRuleConfigurations'] = [rule.to_debugger_rule_config_dict() for rule in estimator.rules]

if isinstance(job_name, (ExecutionInput, StepInput)):
if isinstance(job_name, Placeholder):
parameters['TrainingJobName'] = job_name

if output_path is not None:
parameters['OutputDataConfig']['S3OutputPath'] = output_path

if data is not None and is_data_placeholder:
# Replace the 'S3Uri' key with one that supports JSONpath value.
# Support for uri str only: The list will only contain 1 element
temp_data_uri = parameters['InputDataConfig'][0]['DataSource']['S3DataSource'].pop('S3Uri', None)
parameters['InputDataConfig'][0]['DataSource']['S3DataSource']['S3Uri.$'] = temp_data_uri

if hyperparameters is not None:
parameters['HyperParameters'] = hyperparameters

Expand Down Expand Up @@ -209,7 +230,7 @@ def __init__(self, state_id, transformer, job_name, model_name, data, data_type=
join_source=join_source
)

if isinstance(job_name, (ExecutionInput, StepInput)):
if isinstance(job_name, Placeholder):
parameters['TransformJobName'] = job_name

parameters['ModelName'] = model_name
Expand Down Expand Up @@ -478,7 +499,7 @@ def __init__(self, state_id, processor, job_name, inputs=None, outputs=None, exp
else:
parameters = processing_config(processor=processor, inputs=inputs, outputs=outputs, container_arguments=container_arguments, container_entrypoint=container_entrypoint, kms_key_id=kms_key_id)

if isinstance(job_name, (ExecutionInput, StepInput)):
if isinstance(job_name, Placeholder):
parameters['ProcessingJobName'] = job_name

if experiment_config is not None:
Expand Down
4 changes: 2 additions & 2 deletions src/stepfunctions/steps/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from stepfunctions.exceptions import DuplicateStatesInChain
from stepfunctions.steps.fields import Field
from stepfunctions.inputs import ExecutionInput, StepInput
from stepfunctions.inputs import Placeholder, StepInput


logger = logging.getLogger('stepfunctions.states')
Expand Down Expand Up @@ -53,7 +53,7 @@ def _replace_placeholders(self, params):
return params
modified_parameters = {}
for k, v in params.items():
if isinstance(v, (ExecutionInput, StepInput)):
if isinstance(v, Placeholder):
modified_key = "{key}.$".format(key=k)
modified_parameters[modified_key] = v.to_jsonpath()
elif isinstance(v, dict):
Expand Down
53 changes: 53 additions & 0 deletions tests/integ/test_sagemaker_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sagemaker.tuner import HyperparameterTuner
from sagemaker.processing import ProcessingInput, ProcessingOutput

from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import Chain
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, TuningStep, ProcessingStep
from stepfunctions.workflow import Workflow
Expand Down Expand Up @@ -104,6 +105,58 @@ def test_training_step(pca_estimator_fixture, record_set_fixture, sfn_client, sf
state_machine_delete_wait(sfn_client, workflow.state_machine_arn)
# End of Cleanup


# TODO: Add integ test with StepInput
def test_training_step_with_placeholders(pca_estimator_fixture,
record_set_fixture,
sfn_client,
sfn_role_arn,
sagemaker_session):
# Build workflow definition
execution_input = ExecutionInput(schema={
'JobName': str,
'OutputPath': str,
'Data': str
})

job_name_placeholder = execution_input['JobName']
output_path_placeholder = execution_input['OutputPath']
training_step_name = 'TrainingStep'

training_step = TrainingStep(training_step_name,
estimator=pca_estimator_fixture,
job_name=job_name_placeholder,
data=record_set_fixture,
mini_batch_size=200,
output_path=output_path_placeholder)
workflow_graph = Chain([training_step])

with timeout(minutes=DEFAULT_TIMEOUT_MINUTES):
# Create workflow and check definition
workflow = create_workflow_and_check_definition(
workflow_graph=workflow_graph,
workflow_name=unique_name_from_base("integ-test-training-step-workflow"),
sfn_client=sfn_client,
sfn_role_arn=sfn_role_arn
)

# Execute workflow
job_id = generate_job_name()
execution_input = {
'OutputPath': f's3://{sagemaker_session.default_bucket()}/',
'JobName': f'TrainingJob-{job_id}'
}

# Check workflow output
execution = workflow.execute(inputs=execution_input)
execution_output = execution.get_output(wait=True)
assert execution_output.get("TrainingJobStatus") == "Completed"

# Cleanup
state_machine_delete_wait(sfn_client, workflow.state_machine_arn)
# End of Cleanup


def test_model_step(trained_estimator, sfn_client, sagemaker_session, sfn_role_arn):
# Build workflow definition
model_name = generate_job_name()
Expand Down
76 changes: 75 additions & 1 deletion tests/unit/test_sagemaker_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sagemaker.processing import ProcessingInput, ProcessingOutput

from unittest.mock import MagicMock, patch
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointStep, EndpointConfigStep, ProcessingStep
from stepfunctions.steps.sagemaker import tuning_config

Expand Down Expand Up @@ -224,6 +225,7 @@ def test_training_step_creation(pca_estimator):
'TrialName': 'pca_trial',
'TrialComponentDisplayName': 'Training'
},
output_path='s3://sagemaker-us-east-1-111111111111',
tags=DEFAULT_TAGS,
)
assert step.to_dict() == {
Expand All @@ -234,7 +236,7 @@ def test_training_step_creation(pca_estimator):
'TrainingInputMode': 'File'
},
'OutputDataConfig': {
'S3OutputPath': 's3://sagemaker/models'
'S3OutputPath': 's3://sagemaker-us-east-1-111111111111'
},
'StoppingCondition': {
'MaxRuntimeInSeconds': 86400
Expand Down Expand Up @@ -265,6 +267,78 @@ def test_training_step_creation(pca_estimator):
}


@patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call)
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_training_step_creation_with_placeholders(pca_estimator):
execution_input = ExecutionInput(schema={
'Data': str,
'JobName': str,
'OutputPath': str,
})

step = TrainingStep('Training',
estimator=pca_estimator,
job_name= execution_input['JobName'],
data=execution_input['Data'],
output_path=execution_input['OutputPath'],
experiment_config={
'ExperimentName': 'pca_experiment',
'TrialName': 'pca_trial',
'TrialComponentDisplayName': 'Training'
},
tags=DEFAULT_TAGS,
)
assert step.to_dict() == {
'Type': 'Task',
'Parameters': {
'AlgorithmSpecification': {
'TrainingImage': PCA_IMAGE,
'TrainingInputMode': 'File'
},
'OutputDataConfig': {
'S3OutputPath.$': "$$.Execution.Input['OutputPath']"
},
'StoppingCondition': {
'MaxRuntimeInSeconds': 86400
},
'ResourceConfig': {
'InstanceCount': 1,
'InstanceType': 'ml.c4.xlarge',
'VolumeSizeInGB': 30
},
'RoleArn': EXECUTION_ROLE,
'HyperParameters': {
'feature_dim': '50000',
'num_components': '10',
'subtract_mean': 'True',
'algorithm_mode': 'randomized',
'mini_batch_size': '200'
},
'InputDataConfig': [
{
'ChannelName': 'training',
'DataSource': {
'S3DataSource': {
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri.$': "$$.Execution.Input['Data']"
}
}
}
],
'ExperimentConfig': {
'ExperimentName': 'pca_experiment',
'TrialName': 'pca_trial',
'TrialComponentDisplayName': 'Training'
},
'TrainingJobName.$': "$$.Execution.Input['JobName']",
'Tags': DEFAULT_TAGS_LIST
},
'Resource': 'arn:aws:states:::sagemaker:createTrainingJob.sync',
'End': True
}


@patch('botocore.client.BaseClient._make_api_call', new=mock_boto_api_call)
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_training_step_creation_with_debug_hook(pca_estimator_with_debug_hook):
Expand Down