Difference between prefect.engine andprefect flow-run start
#20281
-
|
Hey there, i'm currently trying to implement a custom runner but i'm finding different ways on how the runner is actually executing the flow run. I'm currently using Could someone describe where is the difference, pros/cons of the different ways and is one or the other recommended for certain use cases? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
TL;DR: Summary in one line
For example, running a Flow on a Kubernetes worker/runner results in a new Pod being created with the container arguments being set to 1) Execution model: in-process vs runner-managed subprocess
with handle_engine_signals(flow_run_id):
from prefect.flow_engine import (
flow_run_logger,
load_flow,
load_flow_run,
run_flow,
)
flow_run: "FlowRun" = load_flow_run(flow_run_id=flow_run_id)
run_logger: "LoggingAdapter" = flow_run_logger(flow_run=flow_run)
try:
flow: "Flow[..., Any]" = load_flow(flow_run)
except Exception:
run_logger.error(
"Unexpected exception encountered when trying to load flow",
exc_info=True,
)
raise
# run the flow
if flow.isasync:
run_coro_as_sync(run_flow(flow, flow_run=flow_run, error_logger=run_logger))
else:
run_flow(flow, flow_run=flow_run, error_logger=run_logger)By contrast, runner = Runner()
...
await runner.execute_flow_run(id)The runner path sets up execution using # Otherwise, we'll need to run a `python -m prefect.engine` command to load and run the flow
if command is None:
runner_command = [get_sys_executable(), "-m", "prefect.engine"]
...
env.update(
{
**{
"PREFECT__FLOW_RUN_ID": str(flow_run.id),
"PREFECT__STORAGE_BASE_PATH": str(self._tmp_dir),
"PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false",
},
**({"PREFECT__FLOW_ENTRYPOINT": entrypoint} if entrypoint else {}),
}
)
...
process = await run_process(command=runner_command, ...)So the CLI 2) State orchestration & cancellation handling
@contextmanager
def handle_engine_signals(flow_run_id: UUID | None = None):
try:
yield
except Abort:
...
exit(0)
except Pause:
...
exit(0)
except Exception:
...
exit(1)
except BaseException:
...
raiseThe runner path coordinates with the API and monitors cancellation and state transitions for you:
flow_run = await self._client.read_flow_run(flow_run_id)
if flow_run.state and flow_run.state.is_cancelling():
await self._mark_flow_run_as_cancelled(...)
...
return
if flow_run.state and flow_run.state.is_cancelled():
...
return
process = await self._runs_task_group.start(
partial(self._submit_run_and_capture_errors, ...)
)
...
if self.heartbeat_seconds is not None:
await self._emit_flow_run_heartbeat(flow_run)And when the process exits, it logs exit status and proposes crashed state if needed. if exit_code:
flow_run_logger.log(...)
...
if exit_code != 0 and not self._rescheduling:
await self._propose_crashed_state(
flow_run,
f"Flow run process exited with non-zero status code {exit_code}.",
)This extra orchestration does not happen in 3) Signal handling differences (SIGTERM behavior)
def _handle_reschedule_sigterm(_signal: int, _frame: FrameType | None):
logger.info("SIGTERM received, initiating graceful shutdown...")
runner.reschedule_current_flow_runs()
exit_with_success("Flow run successfully rescheduled.")
...
if ... and on_sigterm == "reschedule":
signal.signal(signal.SIGTERM, _handle_reschedule_sigterm)The reschedule logic is runner-based and updates state to def reschedule_current_flow_runs(self) -> None:
self._rescheduling = True
...
propose_state_sync(client, AwaitingRetry(), flow_run_id=flow_run.id)
os.kill(process_info["pid"], signal.SIGTERM)4) Storage & deployment-specific behaviorRunner execution has deployment-aware behavior:
if flow_run.deployment_id is not None:
flow = self._deployment_flow_map.get(flow_run.deployment_id)
if flow:
process = run_flow_in_subprocess(flow, flow_run=flow_run)
...
return process.exitcode
...
storage = self._deployment_storage_map.get(flow_run.deployment_id) ...
if storage and storage.pull_interval:
...
await storage.pull_code()The 5) Environment and config propagationRunner builds a process environment by combining settings + run-specific vars + current OS environment. env.update(get_current_settings().to_environment_variables(exclude_unset=True))
env.update({
"PREFECT__FLOW_RUN_ID": str(flow_run.id),
"PREFECT__STORAGE_BASE_PATH": str(self._tmp_dir),
"PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false",
...
})
env.update(**os.environ)
|
Beta Was this translation helpful? Give feedback.
TL;DR: Summary in one line
engine.py__main__is the low-level “execute this flow run in this process” entrypoint, whileflow_run.execute()is a runner-mediated orchestration command that manages process lifecycle, state transitions, cancellation/rescheduling, storage pulls, and environment setup—often by launchingengine.pyin a subprocess.For example, running a Flow on a Kubernetes worker/runner results in a new Pod being created with the container arguments being set to
prefect flow-run executeandPREFECT__FLOW_RUN_IDenvironment set to the GUID of the Flow run ID.1) Execution model: in-process vs runner-managed subprocess
engine.py__main__executes the flow directly in-process aft…