Skip to content

Commit 4d545aa

Browse files
authored
perf: event-driven aggregator loop + inlined drain hot path (#75)
1 parent 27021db commit 4d545aa

2 files changed

Lines changed: 68 additions & 24 deletions

File tree

src/traceml/aggregator/trace_aggregator.py

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -237,20 +237,27 @@ def _drain_tcp(self) -> None:
237237
Each message is expected to be a telemetry row or batch compatible with
238238
``SQLiteWriterSimple.ingest()``. A legacy subset is also mirrored into
239239
``RemoteDBStore`` for renderers that still depend on the in-memory path.
240+
241+
Design note
242+
-----------
243+
Errors in each sink are isolated: a failure in one does not prevent the
244+
other from receiving the message. Direct try/except is used instead of
245+
``_safe(lambda ...)`` to avoid allocating two closure objects per message
246+
in the hot path.
240247
"""
241248
for msg in self._tcp_server.poll():
242249
remote_msg = self._filter_remote_store_message(msg)
243250
if remote_msg is not None:
244-
_safe(
245-
self._logger,
246-
"RemoteDBStore.ingest failed",
247-
lambda m=remote_msg: self._store.ingest(m),
248-
)
249-
_safe(
250-
self._logger,
251-
"SQLiteWriter.ingest failed",
252-
lambda m=msg: self._sqlite_writer.ingest(m),
253-
)
251+
try:
252+
self._store.ingest(remote_msg)
253+
except Exception:
254+
self._logger.exception(
255+
"[TraceML] RemoteDBStore.ingest failed"
256+
)
257+
try:
258+
self._sqlite_writer.ingest(msg)
259+
except Exception:
260+
self._logger.exception("[TraceML] SQLiteWriter.ingest failed")
254261

255262
def _message_sampler_name(self, msg: Any) -> Optional[str]:
256263
"""
@@ -300,26 +307,38 @@ def _filter_remote_store_message(self, msg: Any) -> Any:
300307

301308
def _loop(self) -> None:
302309
"""
303-
Run the periodic drain and display tick loop.
310+
Run the event-driven drain and display tick loop.
311+
312+
The loop blocks on ``TCPServer.wait_for_data()`` rather than sleeping
313+
for a fixed interval. This means the aggregator drains new messages
314+
as soon as they arrive over TCP — reducing end-to-end ingestion latency
315+
from up to ``render_interval_sec`` down to near-zero.
304316
305-
The aggregator does not render directly. Rendering is delegated to the
306-
configured display driver.
317+
The display driver tick is still rate-limited to at most once per
318+
``render_interval_sec`` so the UI cadence is unchanged.
307319
"""
308320
interval_sec = float(self._settings.render_interval_sec)
321+
last_tick_ts = 0.0
309322

310323
while not self._stop_event.is_set():
324+
# Wake immediately when data arrives, or after interval_sec at most.
325+
self._tcp_server.wait_for_data(timeout=interval_sec)
311326
self._drain_tcp()
312-
_safe(
313-
self._logger,
314-
"Final summary service poll failed",
315-
self._summary_service.poll,
316-
)
317-
_safe(
318-
self._logger,
319-
"Display driver tick failed",
320-
self._display_driver.tick,
321-
)
322-
self._stop_event.wait(interval_sec)
327+
328+
# Rate-limit the UI tick to interval_sec cadence.
329+
now = time.monotonic()
330+
if now - last_tick_ts >= interval_sec:
331+
_safe(
332+
self._logger,
333+
"Final summary service poll failed",
334+
self._summary_service.poll,
335+
)
336+
_safe(
337+
self._logger,
338+
"Display driver tick failed",
339+
self._display_driver.tick,
340+
)
341+
last_tick_ts = now
323342

324343
# Final drain and final display tick on shutdown.
325344
self._drain_tcp()

src/traceml/transport/tcp_transport.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def __init__(self, cfg: TCPConfig):
3434
self._thread: Optional[threading.Thread] = None
3535
self._stop_event = threading.Event()
3636
self._queue: "queue.Queue[Dict]" = queue.Queue()
37+
self._data_ready = (
38+
threading.Event()
39+
) # set by _handle_client on every new message
3740
self.logger = get_error_logger("TraceML-TCPServer")
3841

3942
def start(self) -> None:
@@ -70,6 +73,27 @@ def poll(self) -> Iterator[Dict]:
7073
except queue.Empty:
7174
break
7275

76+
def wait_for_data(self, timeout: float) -> bool:
77+
"""
78+
Block until at least one message is available or ``timeout`` expires.
79+
80+
Returns
81+
-------
82+
bool
83+
True if a message arrived before the timeout; False if the timeout
84+
expired with no data. The caller should then call ``poll()`` to
85+
drain whatever is available.
86+
87+
Notes
88+
-----
89+
- The internal event is cleared before this method returns so that the
90+
next call will block correctly (no spurious immediate wakeup).
91+
- This method is safe to call from any thread.
92+
"""
93+
signalled = self._data_ready.wait(timeout=timeout)
94+
self._data_ready.clear()
95+
return signalled
96+
7397
def _run(self) -> None:
7498
while not self._stop_event.is_set():
7599
try:
@@ -135,6 +159,7 @@ def _handle_client(self, conn: socket.socket) -> None:
135159
try:
136160
msg = decoder.decode(payload)
137161
self._queue.put_nowait(msg)
162+
self._data_ready.set() # wake the aggregator loop
138163
except queue.Full:
139164
pass # drop on overflow
140165
except Exception:

0 commit comments

Comments
 (0)