Skip to content

Commit 0738816

Browse files
committed
Add param with no default to wf template
1 parent ffc807a commit 0738816

File tree

3 files changed

+49
-4
lines changed

3 files changed

+49
-4
lines changed

metaflow/plugins/aip/aip.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,16 @@ def _create_workflow_yaml(
300300

301301
# Service account is added through webhooks.
302302
workflow["spec"].pop("serviceAccountName", None)
303+
304+
# Parameters with no defaults need to be added to WorkflowTemplates.
305+
# This allows the parameters to be supplied by Workflow that reference the WorkflowTemplate.
306+
workflow["spec"]["arguments"]["parameters"].extend(
307+
[
308+
dict(name=param)
309+
for param in self.flow._get_parameters()
310+
if param not in flow_parameters
311+
]
312+
)
303313
else:
304314
raise NotImplementedError(f"Unsupported output format {kind}.")
305315

metaflow/plugins/aip/tests/test_argo_support.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
1+
import importlib.util
2+
import inspect
13
import os
4+
import sys
25
from tempfile import TemporaryDirectory
6+
import yaml
37

48
import pytest
59
import subprocess_tee
610

711
from . import _python, obtain_flow_file_paths
8-
12+
from metaflow import FlowSpec
913

1014
disabled_test_flows = [
1115
"aip_flow.py", # kfp_preceding_component feature has been deprecated.
1216
]
17+
sys.path.append("flows")
1318

1419

1520
@pytest.mark.parametrize(
@@ -52,6 +57,11 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None:
5257
validation_cmd = f"argo lint {output_path}"
5358
else:
5459
validation_cmd = f"argo template lint {output_path}"
60+
assert_workflow_template_contains_all_parameters(
61+
flow_base_name=flow_base_name,
62+
flow_path=full_path,
63+
workflow_template_path=output_path,
64+
)
5565

5666
assert (
5767
subprocess_tee.run(
@@ -61,3 +71,31 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None:
6171
).returncode
6272
== 0
6373
)
74+
75+
76+
def assert_workflow_template_contains_all_parameters(
77+
flow_base_name: str, flow_path: str, workflow_template_path: str
78+
) -> None:
79+
spec = importlib.util.spec_from_file_location(flow_base_name, flow_path)
80+
module = importlib.util.module_from_spec(spec)
81+
flow = None
82+
for name in dir(module):
83+
var = getattr(module, name)
84+
if inspect.isclass(var) and issubclass(var, FlowSpec) and var != FlowSpec:
85+
flow = var # Found valid flow class
86+
assert flow is not None
87+
flow_parameters = [
88+
param
89+
for param in dir(flow)
90+
if not param.startswith("_") and not callable(getattr(flow, param))
91+
]
92+
if not flow_parameters:
93+
# No parameters found. Skipping.
94+
return
95+
96+
with open(workflow_template_path) as workflow_template_file:
97+
workflow_output = yaml.safe_load(workflow_template_file)
98+
output_params = workflow_output["spec"]["arguments"].get("parameters")
99+
100+
for param in flow_parameters:
101+
assert param in output_params

metaflow/plugins/aip/tests/test_kfp_s3_sensor.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@
22
import marshal
33
import os
44
import tempfile
5-
from unittest import mock
6-
from unittest.mock import Mock, PropertyMock, call, patch
75

86
import boto3
97
import pytest
10-
from botocore.exceptions import ClientError
118
from moto import mock_s3
129

1310
from metaflow.plugins.aip.aip_s3_sensor import wait_for_s3_path

0 commit comments

Comments
 (0)