-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(backend): randomizing output uri path to avoid overwriting. Fixes #10186 #11243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(backend): randomizing output uri path to avoid overwriting. Fixes #10186 #11243
Conversation
Signed-off-by: b4sus <[email protected]>
Hi @b4sus. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/ok-to-test
Hey @b4sus , thanks for the contribution! Can you provide a sample pipeline that illustrates the issue this pr is aiming to resolve? At least in the case of a component being re-used, I believe the taskname will have a cc @gmfrasca |
@HumairAK - This appears to only impact output artifacts, and only changes the driver behavior when in CONTAINER driver mode, so I don't believe this should have any effect on #10798 in terms of sub-DAG naming schemes, etc. With that said, I did see that ParallelFor outputs are storing artifacts in the same URI, which is a problem that this PR addresses by adding UUID salts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested this out using a ParallelFor task and confirmed each iteration's output artifacts are given unique URIs which are referenced properly in KFP UI.
/lgtm
Hey @HumairAK , We noticed the problem when, from one pipeline, we started other pipelines (pipeline as component) using ParallelFor. This is roughly the code: @dsl.pipeline
def inner_pipeline(date_to_process: str):
comp1_task = component1(date_to_process = date_to_process)
comp2_task = component2(comp1_task.outputs["output_df"])
@dsl.pipeline
def main_pipeline(from_date: str, to_date: str):
prepare_dates_task = prepare_dates_component(from_date = from_date, to_date = to_date)
with dsl.ParallelFor(items = prepare_dates_task.output, parallelism=4) as date_to_process:
inner_ppln_task = inner_pipeline(date_to_process = date_to_process) In this case, many inner pipelines were started (more then 4 as parallelism is not yet supported) and problem was that output of component1 was/is written to the same minio location, so overwriting each other. And subsequently couple of component2 tasks get the same input, regardless of the argument (date_to_process), producing the same final output (not visible here in code as it is store directly in component). |
Perfect, thanks guys tested and works as well with the following pipeline: pipeline.pyfrom typing import List
from kfp import dsl, compiler
from kfp.dsl import Dataset
from kfp.dsl import Output, InputPath
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def component1(date_to_process: str, output_df: Output[Dataset]):
with open(output_df.path, 'w') as f:
f.write(date_to_process)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def component2(dataset_in: InputPath('Dataset')):
with open(dataset_in, 'r') as input_file:
dataset_one_contents = input_file.read()
print(dataset_one_contents)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def prepare_dates_component() -> List[str]:
return ["1", "2", "3", "4", "5", "6"]
@dsl.pipeline
def inner_pipeline(date_to_process: str):
comp1_task = component1(date_to_process = date_to_process).set_caching_options(enable_caching=False)
comp2_task = component2(dataset_in = comp1_task.outputs["output_df"]).set_caching_options(enable_caching=False)
@dsl.pipeline
def main_pipeline():
prepare_dates_task = prepare_dates_component().set_caching_options(enable_caching=False)
with dsl.ParallelFor(items = prepare_dates_task.output, parallelism=4) as date_to_process:
inner_ppln_task = inner_pipeline(date_to_process = date_to_process)
if __name__ == '__main__':
compiler.Compiler().compile(main_pipeline, __file__ + '.yaml') /lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: HumairAK The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Should we use the exact same base image? I ran it without that image, and it failed. Additionally, I'm unable to read the artifacts in a loop—any idea why? I used the exact same code but still couldn't read the artifact in a loop. Component 2 doesn't even start, and this issue keeps occurring. Moreover, when not using that base image, it fails to generate a random UUID for my paths and always overwrites the file. Any Help Please ? It's blocking our team, thanks ! |
@wassimbensalem I think you might me encountering a couple of different issues, can you reach out in the cncf kfp platform slack with the errors you are encountering, for more context around this base image you can find the dockerfile here this sounds related to this isssue which was recently resolved in master branch, let us know in slack if that is indeed the case, if not I suggest creating a new issue with a reproducible pipeline and information around the platform you are using as well as kfp/ kfp sdk / k8s version. |
In driver, random string is added when uri paths for output artifacts are generated. This should ensure that when component of certain name is executed in parallel (either with ParallelFor or just simply calling it multiple times in @pipeline), its outputs are always stored to different paths.