Skip to content

Commit d4d5310

Browse files
desertaxleclaude
andauthored
Fix heartbeat starvation under CPU-bound workloads (#21276)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f76299f commit d4d5310

File tree

2 files changed

+152
-125
lines changed

2 files changed

+152
-125
lines changed

src/prefect/flow_engine.py

Lines changed: 75 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import datetime
45
import logging
56
import multiprocessing
67
import multiprocessing.context
@@ -38,6 +39,7 @@
3839
from typing_extensions import ParamSpec
3940

4041
from prefect import Task, __version__
42+
from prefect._internal.compatibility.deprecated import deprecated_callable
4143
from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client
4244
from prefect.client.schemas import FlowRun, TaskRun
4345
from prefect.client.schemas.filters import FlowRunFilter
@@ -167,17 +169,19 @@ def load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]:
167169

168170

169171
@contextmanager
170-
def send_heartbeats_sync(
171-
engine: "FlowRunEngine[Any, Any]",
172+
def _send_heartbeats(
173+
engine: "BaseFlowRunEngine[Any, Any]",
174+
join_on_exit: bool = True,
172175
) -> Generator[None, None, None]:
173-
"""Context manager that maintains heartbeats for a sync flow run.
176+
"""Context manager that maintains heartbeats for a flow run using a daemon thread.
174177
175-
Heartbeats are emitted at regular intervals while the flow is running.
176-
The loop checks the flow run state before each heartbeat and stops
177-
if the run reaches a terminal state.
178+
Uses a background OS thread instead of an asyncio task so that heartbeats
179+
fire even when the event loop is blocked by CPU-bound work.
178180
179181
Args:
180-
engine: The FlowRunEngine instance to emit heartbeats for.
182+
engine: The flow run engine instance to emit heartbeats for.
183+
join_on_exit: Whether to join the heartbeat thread on exit. Set to
184+
`False` in async engines to avoid blocking the event loop.
181185
182186
Yields:
183187
None
@@ -188,6 +192,9 @@ def send_heartbeats_sync(
188192
return
189193
heartbeat_seconds = max(heartbeat_seconds, MINIMUM_HEARTBEAT_INTERVAL)
190194

195+
# Pre-compute the event template once to minimize per-heartbeat GIL hold time
196+
resource, related = engine._build_heartbeat_event_template()
197+
191198
stop_event = threading.Event()
192199

193200
def heartbeat_loop() -> None:
@@ -202,7 +209,7 @@ def heartbeat_loop() -> None:
202209
return
203210

204211
try:
205-
engine._emit_flow_run_heartbeat()
212+
engine._emit_flow_run_heartbeat(resource, related)
206213
except Exception:
207214
engine.logger.debug("Failed to emit heartbeat", exc_info=True)
208215

@@ -220,75 +227,33 @@ def heartbeat_loop() -> None:
220227
yield
221228
finally:
222229
stop_event.set()
223-
thread.join(timeout=2)
230+
if join_on_exit:
231+
thread.join(timeout=2)
224232
engine.logger.debug("Stopped flow run heartbeat context")
225233

226234

235+
@deprecated_callable(
236+
start_date=datetime.datetime(2026, 3, 1),
237+
help="Use `_send_heartbeats` instead.",
238+
)
239+
@contextmanager
240+
def send_heartbeats_sync(
241+
engine: "FlowRunEngine[Any, Any]",
242+
) -> Generator[None, None, None]:
243+
with _send_heartbeats(engine, join_on_exit=True):
244+
yield
245+
246+
247+
@deprecated_callable(
248+
start_date=datetime.datetime(2026, 3, 1),
249+
help="Use `_send_heartbeats` instead.",
250+
)
227251
@asynccontextmanager
228252
async def send_heartbeats_async(
229253
engine: "AsyncFlowRunEngine[Any, Any]",
230254
) -> AsyncGenerator[None, None]:
231-
"""Async context manager that maintains heartbeats for an async flow run.
232-
233-
Heartbeats are emitted at regular intervals while the flow is running.
234-
The loop checks the flow run state before each heartbeat and stops
235-
if the run reaches a terminal state.
236-
237-
Args:
238-
engine: The AsyncFlowRunEngine instance to emit heartbeats for.
239-
240-
Yields:
241-
None
242-
"""
243-
heartbeat_seconds = engine.heartbeat_seconds
244-
if heartbeat_seconds is None:
255+
with _send_heartbeats(engine, join_on_exit=False):
245256
yield
246-
return
247-
heartbeat_seconds = max(heartbeat_seconds, MINIMUM_HEARTBEAT_INTERVAL)
248-
249-
stop_flag = False
250-
251-
async def heartbeat_loop() -> None:
252-
nonlocal stop_flag
253-
try:
254-
while not stop_flag:
255-
# Check state before emitting - don't emit if final
256-
if (
257-
engine.flow_run
258-
and engine.flow_run.state
259-
and engine.flow_run.state.is_final()
260-
):
261-
engine.logger.debug(
262-
"Flow run in terminal state, stopping heartbeat"
263-
)
264-
return
265-
266-
try:
267-
engine._emit_flow_run_heartbeat()
268-
except Exception:
269-
engine.logger.debug("Failed to emit heartbeat", exc_info=True)
270-
271-
# Sleep in increments to allow quick shutdown (parity with sync version)
272-
for _ in range(heartbeat_seconds):
273-
if stop_flag:
274-
return
275-
await asyncio.sleep(1)
276-
except asyncio.CancelledError:
277-
engine.logger.debug("Heartbeat loop cancelled")
278-
279-
task = asyncio.create_task(heartbeat_loop())
280-
engine.logger.debug("Started flow run heartbeat context")
281-
282-
try:
283-
yield
284-
finally:
285-
stop_flag = True
286-
task.cancel()
287-
try:
288-
await task
289-
except asyncio.CancelledError:
290-
pass
291-
engine.logger.debug("Stopped flow run heartbeat context")
292257

293258

294259
@dataclass
@@ -342,15 +307,28 @@ def cancel_all_tasks(self) -> None:
342307
if hasattr(self.flow.task_runner, "cancel_all"):
343308
self.flow.task_runner.cancel_all() # type: ignore
344309

345-
def _emit_flow_run_heartbeat(self) -> None:
346-
"""Emit a heartbeat event for the current flow run."""
347-
if not self.flow_run:
348-
return
310+
def _build_heartbeat_event_template(
311+
self,
312+
) -> tuple[dict[str, str], list[RelatedResource]]:
313+
"""Pre-compute the heartbeat event resource and related list.
349314
315+
Called once before starting the heartbeat thread to avoid repeated
316+
Pydantic validation (RelatedResource.model_validate) on every tick.
317+
"""
318+
resource: dict[str, str] = {}
350319
related: list[RelatedResource] = []
320+
321+
if not self.flow_run:
322+
return resource, related
323+
324+
resource = {
325+
"prefect.resource.id": f"prefect.flow-run.{self.flow_run.id}",
326+
"prefect.resource.name": self.flow_run.name or "",
327+
"prefect.version": __version__,
328+
}
329+
351330
tags: list[str] = list(self.flow_run.tags or [])
352331

353-
# Add flow as related resource using flow_id for consistency with other events
354332
if self.flow_run.flow_id:
355333
related.append(
356334
RelatedResource.model_validate(
@@ -362,8 +340,6 @@ def _emit_flow_run_heartbeat(self) -> None:
362340
)
363341
)
364342

365-
# Add deployment as related resource if available
366-
# Note: deployment name is not available on flow_run without an API call
367343
if self.flow_run.deployment_id:
368344
related.append(
369345
RelatedResource.model_validate(
@@ -376,13 +352,29 @@ def _emit_flow_run_heartbeat(self) -> None:
376352

377353
related += tags_as_related_resources(set(tags))
378354

355+
return resource, related
356+
357+
def _emit_flow_run_heartbeat(
358+
self,
359+
resource: dict[str, str] | None = None,
360+
related: list[RelatedResource] | None = None,
361+
) -> None:
362+
"""Emit a heartbeat event for the current flow run.
363+
364+
Args:
365+
resource: Pre-computed resource dict from _build_heartbeat_event_template.
366+
related: Pre-computed related list from _build_heartbeat_event_template.
367+
If not provided, builds the template on the fly (backward compat).
368+
"""
369+
if not self.flow_run:
370+
return
371+
372+
if resource is None or related is None:
373+
resource, related = self._build_heartbeat_event_template()
374+
379375
emit_event(
380376
event="prefect.flow-run.heartbeat",
381-
resource={
382-
"prefect.resource.id": f"prefect.flow-run.{self.flow_run.id}",
383-
"prefect.resource.name": self.flow_run.name or "",
384-
"prefect.version": __version__,
385-
},
377+
resource=resource,
386378
related=related,
387379
)
388380

@@ -989,7 +981,7 @@ def run_context(self):
989981
seconds=self.flow.timeout_seconds,
990982
timeout_exc_type=FlowRunTimeoutError,
991983
):
992-
with send_heartbeats_sync(self):
984+
with _send_heartbeats(self):
993985
self.logger.debug(
994986
f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}..."
995987
)
@@ -1596,7 +1588,7 @@ async def run_context(self):
15961588
seconds=self.flow.timeout_seconds,
15971589
timeout_exc_type=FlowRunTimeoutError,
15981590
):
1599-
async with send_heartbeats_async(self):
1591+
with _send_heartbeats(self, join_on_exit=False):
16001592
self.logger.debug(
16011593
f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}..."
16021594
)

0 commit comments

Comments
 (0)