-
Notifications
You must be signed in to change notification settings - Fork 536
Add dynamic fan-out/fan-in with run templates #3826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
Documentation Link Check Results✅ Absolute links check passed |
print(f"Waiting for {len(run_ids)} chunk processing runs to complete...") | ||
while True: | ||
completed_runs = 0 | ||
for run_id in run_ids: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should IMO cache already successful runs, and don't fetch it again.
Also, we need one of the following two:
- Either this step fails as soon as one template fails
- Or the aggregate step somehow handles failed runs, which might not have the artifacts its trying to load. But this should be somehow logged or communicated to the user, otherwise the aggregation will simply be wrong.
Second one is better I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schustmi agreed done!
|
||
|
||
@pipeline | ||
def master_pipeline(template_id: Optional[UUID] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not fan_out_fan_in_pipeline
to keep it consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schustmi agreed done!
run = client.get_pipeline_run(run_id) | ||
|
||
# Check if run succeeded | ||
if run.status.is_failed: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is_failed
doesn't exit I think? I think you should try to run this once at least to see if the code works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed it existed because the is_success thing existed... ill try to run it
src/zenml/enums.py
Outdated
"""Whether the execution status refers to a successful execution. | ||
|
||
Returns: | ||
Whether the execution status refers to a successful execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether the execution status refers to a successful execution. | |
Whether the execution status refers to a failed execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait i havnt requested review yet!!
src/zenml/enums.py
Outdated
@@ -107,6 +107,15 @@ def is_successful(self) -> bool: | |||
""" | |||
return self in {ExecutionStatus.COMPLETED, ExecutionStatus.CACHED} | |||
|
|||
@property | |||
def is_failed(self) -> bool: | |||
"""Whether the execution status refers to a successful execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Whether the execution status refers to a successful execution. | |
"""Whether the execution status refers to a failed execution. |
"""Trigger multiple pipeline runs for each chunk and wait for completion.""" | ||
client = Client() | ||
|
||
# Use template ID if provided, otherwise use pipeline name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth explaining what happens here? Right now you're saying "use pipeline name", but that doesn't really mean anything to anyone. What this does is fetch the latest template for the pipeline with that name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
# First, make sure you run the chunk_processing_pipeline once | ||
# on a remote orchestrator: | ||
# chunk_processing_pipeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing this and later calling Client().create_run_template
, you can simply call chunk_processing_pipeline.create_run_template(name=...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didnt know that! Thats great!
# Make sure a remote stack is set before running this | ||
template = chunk_processing_pipeline.create_run_template( | ||
name="chunk_processing_template", | ||
deployment_id=run.deployment_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't exist anymore, and is also not needed in this call
* Add dynamic fan-out/fan-in with run templates * Refactor code for advanced features in pipelines * Refactor chunk processing and results aggregation * Add `is_failed` property to `ExecutionStatus` enum * Update check for failed runs to use run status value * Remove unnecessary import in advanced_features.md * Update is_failed property return statement to refer to failed execution * Update advanced features with improved process_chunk logic * Update advanced features documentation and usage example * Update docs/book/how-to/steps-pipelines/advanced_features.md (cherry picked from commit 07beafb)
Describe changes
I implemented/fixed _ to achieve _.
Pre-requisites
Please ensure you have done the following:
develop
and the open PR is targetingdevelop
. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes