Skip to content

Commit dfc1f09

Browse files
fix(livekit): prevent memory leak when video_in_enabled is False
1 parent 5fc46cc commit dfc1f09

2 files changed

Lines changed: 141 additions & 5 deletions

File tree

src/pipecat/transports/livekit/transport.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,14 @@ async def _async_on_track_subscribed(
539539
elif track.kind == rtc.TrackKind.KIND_VIDEO:
540540
logger.info(f"Video track subscribed: {track.sid} from participant {participant.sid}")
541541
self._video_tracks[participant.sid] = track
542-
video_stream = rtc.VideoStream(track)
543-
self._task_manager.create_task(
544-
self._process_video_stream(video_stream, participant.sid),
545-
f"{self}::_process_video_stream",
546-
)
542+
# Only process video stream if video input is enabled to prevent
543+
# unbounded queue growth when there is no consumer for video frames.
544+
if self._params.video_in_enabled:
545+
video_stream = rtc.VideoStream(track)
546+
self._task_manager.create_task(
547+
self._process_video_stream(video_stream, participant.sid),
548+
f"{self}::_process_video_stream",
549+
)
547550
await self._callbacks.on_video_track_subscribed(participant.sid)
548551

549552
async def _async_on_track_unsubscribed(

tests/test_livekit_transport.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#
2+
# Copyright (c) 2024-2026, Daily
3+
#
4+
# SPDX-License-Identifier: BSD 2-Clause License
5+
#
6+
7+
"""Tests for LiveKit transport video stream handling.
8+
9+
Regression tests for issue #3116: Memory leak when video_in_enabled=False
10+
but video tracks are subscribed.
11+
"""
12+
13+
import asyncio
14+
import unittest
15+
from unittest.mock import AsyncMock, MagicMock
16+
17+
from livekit import rtc
18+
19+
from pipecat.transports.livekit.transport import (
20+
LiveKitCallbacks,
21+
LiveKitParams,
22+
LiveKitTransportClient,
23+
)
24+
25+
26+
class TestLiveKitTransportClient(unittest.IsolatedAsyncioTestCase):
27+
"""Tests for LiveKitTransportClient video stream handling."""
28+
29+
def _create_client(self, video_in_enabled: bool) -> LiveKitTransportClient:
30+
"""Create a LiveKitTransportClient with the specified video_in_enabled setting."""
31+
params = LiveKitParams(video_in_enabled=video_in_enabled)
32+
33+
callbacks = LiveKitCallbacks(
34+
on_connected=AsyncMock(),
35+
on_disconnected=AsyncMock(),
36+
on_before_disconnect=AsyncMock(),
37+
on_participant_connected=AsyncMock(),
38+
on_participant_disconnected=AsyncMock(),
39+
on_audio_track_subscribed=AsyncMock(),
40+
on_audio_track_unsubscribed=AsyncMock(),
41+
on_video_track_subscribed=AsyncMock(),
42+
on_video_track_unsubscribed=AsyncMock(),
43+
on_data_received=AsyncMock(),
44+
on_first_participant_joined=AsyncMock(),
45+
)
46+
47+
client = LiveKitTransportClient(
48+
url="wss://test.livekit.cloud",
49+
token="test-token",
50+
room_name="test-room",
51+
params=params,
52+
callbacks=callbacks,
53+
transport_name="test-transport",
54+
)
55+
56+
# Mock the task manager
57+
client._task_manager = MagicMock()
58+
client._task_manager.create_task = MagicMock()
59+
60+
return client
61+
62+
def _create_mock_video_track(self) -> tuple:
63+
"""Create mock video track, publication, and participant."""
64+
mock_track = MagicMock()
65+
mock_track.kind = rtc.TrackKind.KIND_VIDEO
66+
mock_track.sid = "test-track-sid"
67+
68+
mock_publication = MagicMock()
69+
70+
mock_participant = MagicMock()
71+
mock_participant.sid = "test-participant-sid"
72+
73+
return mock_track, mock_publication, mock_participant
74+
75+
def _was_video_stream_task_created(self, client: LiveKitTransportClient) -> bool:
76+
"""Check if _process_video_stream task was created."""
77+
for call in client._task_manager.create_task.call_args_list:
78+
task_name = call[0][1] if len(call[0]) > 1 else call[1].get("name", "")
79+
if "_process_video_stream" in task_name:
80+
return True
81+
return False
82+
83+
async def test_video_stream_not_started_when_video_in_disabled(self):
84+
"""Test that _process_video_stream is NOT started when video_in_enabled=False.
85+
86+
This prevents unbounded queue growth when there is no consumer for video frames.
87+
Regression test for issue #3116.
88+
"""
89+
client = self._create_client(video_in_enabled=False)
90+
mock_track, mock_publication, mock_participant = self._create_mock_video_track()
91+
92+
# Call the track subscribed handler
93+
await client._async_on_track_subscribed(mock_track, mock_publication, mock_participant)
94+
95+
# Verify that create_task was NOT called for video stream processing
96+
self.assertFalse(
97+
self._was_video_stream_task_created(client),
98+
"Video stream processing should NOT be started when video_in_enabled=False",
99+
)
100+
101+
# Verify that the callback was still called
102+
client._callbacks.on_video_track_subscribed.assert_called_once_with(mock_participant.sid)
103+
104+
# Verify that the track was still added to _video_tracks
105+
self.assertIn(mock_participant.sid, client._video_tracks)
106+
107+
async def test_video_stream_started_when_video_in_enabled(self):
108+
"""Test that _process_video_stream IS started when video_in_enabled=True."""
109+
from unittest.mock import patch
110+
111+
client = self._create_client(video_in_enabled=True)
112+
mock_track, mock_publication, mock_participant = self._create_mock_video_track()
113+
114+
# Mock rtc.VideoStream to avoid needing a real LiveKit connection
115+
with patch("pipecat.transports.livekit.transport.rtc.VideoStream"):
116+
# Call the track subscribed handler
117+
await client._async_on_track_subscribed(mock_track, mock_publication, mock_participant)
118+
119+
# Verify that create_task WAS called for video stream processing
120+
self.assertTrue(
121+
self._was_video_stream_task_created(client),
122+
"Video stream processing SHOULD be started when video_in_enabled=True",
123+
)
124+
125+
# Verify that the callback was called
126+
client._callbacks.on_video_track_subscribed.assert_called_once_with(mock_participant.sid)
127+
128+
# Verify that the track was added to _video_tracks
129+
self.assertIn(mock_participant.sid, client._video_tracks)
130+
131+
132+
if __name__ == "__main__":
133+
unittest.main()

0 commit comments

Comments
 (0)