Skip to content

Commit 9c16c58

Browse files
committed
Handle shutdown crashes in executor and worker submission paths
1 parent 91a55cf commit 9c16c58

File tree

5 files changed

+46
-135
lines changed

5 files changed

+46
-135
lines changed

src/prefect/runner/_flow_run_executor.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,14 @@ async def submit(
123123
# after this await runs only once the process is done.
124124
handle = await self._start_process(task_status)
125125

126+
except anyio.get_cancelled_exc_class():
127+
if self._process_manager.get(self._flow_run.id) is not None:
128+
with anyio.CancelScope(shield=True):
129+
await self._state_proposer.propose_crashed(
130+
self._flow_run,
131+
message="Flow run process exited due to worker shutdown.",
132+
)
133+
raise
126134
except Exception as exc:
127135
self._logger.exception(
128136
"Flow run '%s' could not start: %s",
@@ -136,7 +144,7 @@ async def submit(
136144
return
137145
finally:
138146
# Step 6: remove handle from process_manager
139-
if handle is not None:
147+
if handle is not None or self._process_manager.get(self._flow_run.id):
140148
await self._process_manager.remove(self._flow_run.id)
141149

142150
# Step 7: interpret exit code and propose terminal state

src/prefect/runner/runner.py

Lines changed: 12 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ def fast_flow():
114114
PREFECT_RUNNER_SERVER_ENABLE,
115115
get_current_settings,
116116
)
117-
from prefect.states import (
118-
AwaitingRetry,
119-
)
117+
from prefect.states import AwaitingRetry
120118
from prefect.types.entrypoint import EntrypointType
121119
from prefect.utilities._engine import get_hook_name
122120
from prefect.utilities._infrastructure_exit_codes import get_infrastructure_exit_info
@@ -256,7 +254,6 @@ def goodbye_flow(name):
256254
# --- Facade-owned mutable state (kept until methods are fully delegated) ---
257255
self._submitting_flow_run_ids: set[UUID] = set()
258256
self._rescheduling: bool = False
259-
self._rescheduled_flow_run_ids: set[UUID] = set()
260257

261258
# --- Facade-owned mutable containers (exposed via @property) ---
262259
self.__flow_run_process_map_internal: dict[UUID, ProcessMapEntry] = {}
@@ -794,39 +791,14 @@ async def execute_bundle(
794791
self._flow_run_bundle_map[flow_run.id] = bundle
795792

796793
flow_run_logger = self._get_flow_run_logger(flow_run)
797-
exit_code: int | None = None
798794
try:
799-
await anyio.to_thread.run_sync(
800-
process.join,
801-
abandon_on_cancel=True,
802-
)
803-
exit_code = process.exitcode
804-
except anyio.get_cancelled_exc_class():
805-
with anyio.CancelScope(shield=True):
806-
flow_run_logger.info(
807-
"Runner shutdown interrupted flow run %r; terminating subprocess.",
808-
flow_run.name,
809-
)
810-
exit_code = await self._terminate_bundle_process(process)
811-
812-
if (
813-
exit_code != 0
814-
and flow_run.id not in self._rescheduled_flow_run_ids
815-
):
816-
terminal_state = await self._propose_crashed_state(
817-
flow_run,
818-
"Flow run process exited due to worker shutdown.",
819-
)
820-
if terminal_state:
821-
await self._run_on_crashed_hooks(
822-
flow_run=flow_run, state=terminal_state
823-
)
824-
return
795+
await anyio.to_thread.run_sync(process.join)
825796
finally:
826797
with anyio.CancelScope(shield=True):
827798
await self._remove_flow_run_process_map_entry(flow_run.id)
828799
self._release_limit_slot(flow_run.id)
829800

801+
exit_code = process.exitcode
830802
if exit_code is None:
831803
raise RuntimeError("Process has no exit code")
832804

@@ -851,27 +823,6 @@ async def execute_bundle(
851823
f"Process for flow run {flow_run.name!r} exited cleanly."
852824
)
853825

854-
async def _terminate_bundle_process(
855-
self,
856-
process: multiprocessing.context.SpawnProcess,
857-
grace_seconds: int = 30,
858-
) -> int | None:
859-
if process.exitcode is not None:
860-
return process.exitcode
861-
862-
if process.is_alive():
863-
await anyio.to_thread.run_sync(process.terminate)
864-
865-
with anyio.move_on_after(grace_seconds):
866-
while process.is_alive():
867-
await anyio.sleep(0.1)
868-
869-
if process.is_alive():
870-
await anyio.to_thread.run_sync(process.kill)
871-
872-
await anyio.to_thread.run_sync(process.join)
873-
return process.exitcode
874-
875826
def _get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter:
876827
return flow_run_logger(flow_run=flow_run).getChild(
877828
"runner",
@@ -1071,7 +1022,6 @@ def reschedule_current_flow_runs(
10711022
)
10721023
try:
10731024
propose_state_sync(client, AwaitingRetry(), flow_run_id=flow_run.id)
1074-
self._rescheduled_flow_run_ids.add(flow_run.id)
10751025
os.kill(process_info["pid"], signal.SIGTERM)
10761026
run_logger.info("Rescheduled flow run for resubmission")
10771027
except ProcessLookupError:
@@ -1259,17 +1209,6 @@ async def _submit_run_and_capture_errors(
12591209
flow_run_logger.info(
12601210
f"Process for flow run {flow_run.name!r} exited cleanly."
12611211
)
1262-
except anyio.get_cancelled_exc_class():
1263-
if (
1264-
task_status._future.done()
1265-
and flow_run.id not in self._rescheduled_flow_run_ids
1266-
): # type: ignore[attr-defined]
1267-
with anyio.CancelScope(shield=True):
1268-
await self._propose_crashed_state(
1269-
flow_run,
1270-
"Flow run process exited due to worker shutdown.",
1271-
)
1272-
raise
12731212
except Exception as exc:
12741213
if not task_status._future.done(): # type: ignore
12751214
# This flow run was being submitted and did not start successfully
@@ -1287,12 +1226,20 @@ async def _submit_run_and_capture_errors(
12871226
"occurred."
12881227
)
12891228
return exc
1229+
except anyio.get_cancelled_exc_class():
1230+
if self.stopping and task_status._future.done(): # type: ignore[attr-defined]
1231+
with anyio.CancelScope(shield=True):
1232+
await self._propose_crashed_state(
1233+
flow_run,
1234+
"Flow run process exited due to worker shutdown.",
1235+
)
1236+
raise
12901237
finally:
12911238
self._release_limit_slot(flow_run.id)
12921239

12931240
await self._remove_flow_run_process_map_entry(flow_run.id)
12941241

1295-
if exit_code != 0 and flow_run.id not in self._rescheduled_flow_run_ids:
1242+
if exit_code != 0:
12961243
await self._propose_crashed_state(
12971244
flow_run,
12981245
f"Flow run process exited with non-zero status code {exit_code}.",

src/prefect/workers/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,6 +1472,14 @@ async def _submit_run_and_capture_errors(
14721472
task_status=task_status,
14731473
configuration=configuration,
14741474
)
1475+
except anyio.get_cancelled_exc_class():
1476+
if task_status and getattr(task_status, "_future").done():
1477+
with anyio.CancelScope(shield=True):
1478+
await self._propose_crashed_state(
1479+
flow_run,
1480+
"Flow run process exited due to worker shutdown.",
1481+
)
1482+
raise
14751483
except Exception as exc:
14761484
if task_status and not getattr(task_status, "_future").done():
14771485
# This flow run was being submitted and did not start successfully

tests/runner/test__flow_run_executor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,23 @@ async def test_submit_proposes_crashed_on_nonzero_exit(self):
197197
assert m["flow_run"] == call_args[0][0]
198198
assert "1" in call_args[1]["message"]
199199

200+
async def test_submit_proposes_crashed_when_cancelled_after_start(self):
201+
"""Cancellation after the process has started should mark the run as crashed."""
202+
executor, m = _make_executor()
203+
cancelled_error = anyio.get_cancelled_exc_class()()
204+
executor._start_process = AsyncMock(side_effect=cancelled_error)
205+
m["process_manager"].get = MagicMock(return_value=m["handle"])
206+
207+
with pytest.raises(anyio.get_cancelled_exc_class()):
208+
await executor.submit()
209+
210+
m["process_manager"].remove.assert_awaited_once_with(m["flow_run"].id)
211+
m["state_proposer"].propose_crashed.assert_awaited_once_with(
212+
m["flow_run"],
213+
message="Flow run process exited due to worker shutdown.",
214+
)
215+
m["hook_runner"].run_crashed_hooks.assert_not_awaited()
216+
200217
async def test_submit_crashed_message_includes_registry_explanation(self):
201218
"""The crashed state message should include the explanation from
202219
the centralized exit code registry."""

tests/runner/test_runner.py

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,33 +1414,6 @@ async def test_runner_logs_exit_code_help_message(
14141414
else:
14151415
assert record.levelname == "ERROR"
14161416

1417-
async def test_runner_does_not_mark_rescheduled_flow_run_crashed(
1418-
self,
1419-
monkeypatch: pytest.MonkeyPatch,
1420-
):
1421-
runner = Runner()
1422-
runner._release_limit_slot = MagicMock()
1423-
runner._remove_flow_run_process_map_entry = AsyncMock()
1424-
runner._propose_crashed_state = AsyncMock()
1425-
1426-
cancelled_error = anyio.get_cancelled_exc_class()()
1427-
mock_run_process = AsyncMock(side_effect=cancelled_error)
1428-
monkeypatch.setattr(runner, "_run_process", mock_run_process)
1429-
1430-
task_status = MagicMock()
1431-
task_status._future = asyncio.get_running_loop().create_future()
1432-
task_status._future.set_result(None)
1433-
1434-
flow_run = MagicMock()
1435-
flow_run.id = uuid.uuid4()
1436-
flow_run.name = "test-flow-run"
1437-
runner._rescheduled_flow_run_ids.add(flow_run.id)
1438-
1439-
with pytest.raises(anyio.get_cancelled_exc_class()):
1440-
await runner._submit_run_and_capture_errors(flow_run, task_status)
1441-
1442-
runner._propose_crashed_state.assert_not_awaited()
1443-
14441417
@pytest.mark.skipif(
14451418
sys.platform != "win32",
14461419
reason="subprocess.CREATE_NEW_PROCESS_GROUP is only defined in Windows",
@@ -1829,48 +1802,6 @@ def da_hook(
18291802

18301803
assert "This flow was cancelled!" in caplog.text
18311804

1832-
async def test_cancelled_bundle_execution_is_marked_crashed(
1833-
self, prefect_client: PrefectClient
1834-
):
1835-
runner = Runner()
1836-
1837-
@flow
1838-
def slow_flow():
1839-
sleep(30)
1840-
1841-
flow_run = await prefect_client.create_flow_run(slow_flow)
1842-
1843-
result = create_bundle_for_flow_run(slow_flow, flow_run)
1844-
bundle = result["bundle"]
1845-
1846-
async def execute_bundle() -> None:
1847-
await runner.execute_bundle(bundle)
1848-
1849-
async with anyio.create_task_group() as tg:
1850-
tg.start_soon(execute_bundle)
1851-
1852-
with anyio.fail_after(30):
1853-
while True:
1854-
flow_run = await prefect_client.read_flow_run(
1855-
flow_run_id=flow_run.id
1856-
)
1857-
assert flow_run.state is not None
1858-
if flow_run.state.is_running():
1859-
break
1860-
await anyio.sleep(0.5)
1861-
1862-
tg.cancel_scope.cancel()
1863-
1864-
with anyio.fail_after(30):
1865-
while True:
1866-
flow_run = await prefect_client.read_flow_run(
1867-
flow_run_id=flow_run.id
1868-
)
1869-
assert flow_run.state is not None
1870-
if flow_run.state.is_crashed():
1871-
break
1872-
await anyio.sleep(0.5)
1873-
18741805
async def test_crashed_bundle_execution(
18751806
self, prefect_client: PrefectClient, caplog: pytest.LogCaptureFixture
18761807
):

0 commit comments

Comments
 (0)