Skip to content

Commit c8f35c3

Browse files
authored
fix: correctly handle events order & allow to specify eventloops (#60)
1 parent 58236eb commit c8f35c3

File tree

14 files changed

+380
-312
lines changed

14 files changed

+380
-312
lines changed

README.md

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ $ pip install livekit
2525
```python
2626
async def main():
2727
room = livekit.Room()
28-
# By default, autosubscribe is enabled. The participant will be subscribed to
29-
# all published tracks in the room
30-
await room.connect(URL, TOKEN)
31-
logging.info("connected to room %s", room.name)
3228

3329
# participants and tracks that are already available in the room
3430
# participant_connected and track_published events will *not* be emitted for them
@@ -41,21 +37,23 @@ async def main():
4137
logging.info(
4238
"participant connected: %s %s", participant.sid, participant.identity)
4339

44-
video_stream = None
40+
async def receive_frames(stream: livekit.VideoStream):
41+
async for frame in video_stream:
42+
# received a video frame from the track, process it here
43+
pass
4544

4645
# track_subscribed is emitted whenever the local participant is subscribed to a new track
4746
@room.on("track_subscribed")
4847
def on_track_subscribed(track: livekit.Track, publication: livekit.RemoteTrackPublication, participant: livekit.RemoteParticipant):
4948
logging.info("track subscribed: %s", publication.sid)
5049
if track.kind == livekit.TrackKind.KIND_VIDEO:
51-
nonlocal video_stream
5250
video_stream = livekit.VideoStream(track)
51+
asyncio.ensure_future(receive_frames(video_stream))
5352

54-
async for frame in video_stream:
55-
# received a video frame from the track, process it here
56-
pass
57-
58-
await room.run()
53+
# By default, autosubscribe is enabled. The participant will be subscribed to
54+
# all published tracks in the room
55+
await room.connect(URL, TOKEN)
56+
logging.info("connected to room %s", room.name)
5957
```
6058

6159
## Examples

examples/basic_room.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
import livekit
77

88
URL = 'ws://localhost:7880'
9-
TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY'
9+
TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY' # noqa
1010

1111

12-
async def main() -> None:
13-
room = livekit.Room()
12+
async def main(room: livekit.Room) -> None:
1413

1514
@room.listens_to("participant_connected")
1615
def on_participant_connected(participant: livekit.RemoteParticipant) -> None:
@@ -47,24 +46,18 @@ def on_track_unpublished(publication: livekit.RemoteTrackPublication,
4746
participant: livekit.RemoteParticipant):
4847
logging.info("track unpublished: %s", publication.sid)
4948

50-
# Keep a reference to the streams, otherwise they will be disposed
51-
audio_stream = None
52-
video_stream = None
53-
5449
@room.listens_to("track_subscribed")
5550
def on_track_subscribed(track: livekit.Track,
5651
publication: livekit.RemoteTrackPublication,
5752
participant: livekit.RemoteParticipant):
5853
logging.info("track subscribed: %s", publication.sid)
5954
if track.kind == livekit.TrackKind.KIND_VIDEO:
60-
nonlocal video_stream
61-
video_stream = livekit.VideoStream(track)
55+
_video_stream = livekit.VideoStream(track)
6256
# video_stream is an async iterator that yields VideoFrame
6357
elif track.kind == livekit.TrackKind.KIND_AUDIO:
6458
print("Subscribed to an Audio Track")
65-
nonlocal audio_stream
66-
audio_stream = livekit.AudioStream(track)
67-
# audio_stream is an async iterator that yields AudioFrame
59+
_audio_stream = livekit.AudioStream(track)
60+
# audio_stream is an async iterator that yields AudioFrame
6861

6962
@room.listens_to("track_unsubscribed")
7063
def on_track_unsubscribed(track: livekit.Track,
@@ -120,32 +113,32 @@ def on_reconnecting() -> None:
120113
def on_reconnected() -> None:
121114
logging.info("reconnected")
122115

123-
try:
124-
logging.info("connecting to %s", URL)
125-
await room.connect(URL, TOKEN)
126-
logging.info("connected to room %s", room.name)
127-
128-
await room.local_participant.publish_data("hello world")
116+
await room.connect(URL, TOKEN)
117+
logging.info("connected to room %s", room.name)
118+
logging.info("participants: %s", room.participants)
129119

130-
logging.info("participants: %s", room.participants)
131-
132-
await room.run()
133-
except livekit.ConnectError as e:
134-
logging.error("failed to connect to the room: %s", e)
135-
except asyncio.CancelledError:
136-
logging.info("closing the room")
137-
await room.disconnect()
120+
await asyncio.sleep(2)
121+
await room.local_participant.publish_data("hello world")
138122

139123

140124
if __name__ == "__main__":
141125
logging.basicConfig(level=logging.INFO, handlers=[
142-
logging.FileHandler("basic_room.log"), logging.StreamHandler()])
126+
logging.FileHandler("basic_room.log"),
127+
logging.StreamHandler()])
143128

144129
loop = asyncio.get_event_loop()
145-
main_task = asyncio.ensure_future(main())
130+
room = livekit.Room(loop=loop)
131+
132+
async def cleanup():
133+
await room.disconnect()
134+
loop.stop()
135+
136+
asyncio.ensure_future(main(room))
146137
for signal in [SIGINT, SIGTERM]:
147-
loop.add_signal_handler(signal, main_task.cancel)
138+
loop.add_signal_handler(
139+
signal, lambda: asyncio.ensure_future(cleanup()))
140+
148141
try:
149-
loop.run_until_complete(main_task)
142+
loop.run_forever()
150143
finally:
151144
loop.close()

examples/e2ee.py

Lines changed: 61 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import colorsys
32
import logging
43
from signal import SIGINT, SIGTERM
54

@@ -10,44 +9,55 @@
109
URL = 'ws://localhost:7880'
1110
TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY' # noqa
1211

12+
# ("livekitrocks") this is our shared key, it must match the one used by your clients
13+
SHARED_KEY = b"livekitrocks"
1314

14-
async def publish_frames(source: livekit.VideoSource):
15-
argb_frame = livekit.ArgbFrame(
16-
livekit.VideoFormatType.FORMAT_ARGB, 1280, 720)
1715

18-
arr = np.ctypeslib.as_array(argb_frame.data)
16+
async def draw_cube(source: livekit.VideoSource):
17+
W, H, MID_W, MID_H = 1280, 720, 640, 360
18+
cube_size = 60
19+
vertices = (np.array([[-1, -1, -1], [1, -1, -1], [1, 1, -1], [-1, 1, -1],
20+
[-1, -1, 1], [1, -1, 1], [1, 1, 1], [-1, 1, 1]]) * cube_size)
21+
edges = [[0, 1], [1, 2], [2, 3], [3, 0], [4, 5], [5, 6],
22+
[6, 7], [7, 4], [0, 4], [1, 5], [2, 6], [3, 7]]
1923

20-
framerate = 1 / 30
21-
hue = 0.0
24+
frame = livekit.ArgbFrame(livekit.VideoFormatType.FORMAT_ARGB, W, H)
25+
arr = np.ctypeslib.as_array(frame.data)
26+
angle = 0
2227

2328
while True:
24-
frame = livekit.VideoFrame(
25-
0, livekit.VideoRotation.VIDEO_ROTATION_0, argb_frame.to_i420())
26-
27-
rgb = colorsys.hsv_to_rgb(hue, 1.0, 1.0)
28-
rgb = [(x * 255) for x in rgb] # type: ignore
29-
30-
argb_color = np.array(rgb + [255], dtype=np.uint8)
31-
arr.flat[::4] = argb_color[0]
32-
arr.flat[1::4] = argb_color[1]
33-
arr.flat[2::4] = argb_color[2]
34-
arr.flat[3::4] = argb_color[3]
35-
36-
source.capture_frame(frame)
37-
38-
hue += framerate / 3 # 3s for a full cycle
39-
if hue >= 1.0:
40-
hue = 0.0
41-
42-
try:
43-
await asyncio.sleep(framerate)
44-
except asyncio.CancelledError:
45-
break
46-
47-
48-
async def main():
49-
room = livekit.Room()
50-
29+
start_time = asyncio.get_event_loop().time()
30+
arr.fill(0)
31+
rot = np.dot(np.array([[1, 0, 0],
32+
[0, np.cos(angle), -np.sin(angle)],
33+
[0, np.sin(angle), np.cos(angle)]]),
34+
np.array([[np.cos(angle), 0, np.sin(angle)],
35+
[0, 1, 0],
36+
[-np.sin(angle), 0, np.cos(angle)]]))
37+
proj_points = [[int(pt[0] / (pt[2] / 200 + 1)), int(pt[1] / (pt[2] / 200 + 1))]
38+
for pt in np.dot(vertices, rot)]
39+
40+
for e in edges:
41+
x1, y1, x2, y2 = *proj_points[e[0]], *proj_points[e[1]]
42+
for t in np.linspace(0, 1, 100):
43+
x, y = int(MID_W + (1 - t) * x1 + t *
44+
x2), int(MID_H + (1 - t) * y1 + t * y2)
45+
for dx in [-1, 0, 1]:
46+
for dy in [-1, 0, 1]:
47+
if 0 <= x + dx < W and 0 <= y + dy < H:
48+
idx = (y + dy) * W * 4 + (x + dx) * 4
49+
arr[idx:idx+4] = [255, 255, 255, 255]
50+
51+
f = livekit.VideoFrame(
52+
0, livekit.VideoRotation.VIDEO_ROTATION_0, frame.to_i420())
53+
source.capture_frame(f)
54+
angle += 0.02
55+
56+
code_duration = asyncio.get_event_loop().time() - start_time
57+
await asyncio.sleep(1 / 30 - code_duration)
58+
59+
60+
async def main(room: livekit.Room):
5161
@room.listens_to("e2ee_state_changed")
5262
def on_e2ee_state_changed(participant: livekit.Participant,
5363
state: livekit.EncryptionState) -> None:
@@ -56,7 +66,7 @@ def on_e2ee_state_changed(participant: livekit.Participant,
5666
logging.info("connecting to %s", URL)
5767
try:
5868
e2ee_options = livekit.E2EEOptions()
59-
e2ee_options.key_provider_options.shared_key = b"livekitrocks" # this is our e2ee key
69+
e2ee_options.key_provider_options.shared_key = SHARED_KEY
6070

6171
await room.connect(URL, TOKEN, options=livekit.RoomOptions(
6272
auto_subscribe=True,
@@ -68,33 +78,34 @@ def on_e2ee_state_changed(participant: livekit.Participant,
6878
logging.error("failed to connect to the room: %s", e)
6979
return False
7080

81+
# publish a track
7182
source = livekit.VideoSource()
72-
source_task = asyncio.create_task(publish_frames(source))
73-
74-
track = livekit.LocalVideoTrack.create_video_track("hue", source)
83+
track = livekit.LocalVideoTrack.create_video_track("cube", source)
7584
options = livekit.TrackPublishOptions()
7685
options.source = livekit.TrackSource.SOURCE_CAMERA
7786
publication = await room.local_participant.publish_track(track, options)
7887
logging.info("published track %s", publication.sid)
7988

80-
try:
81-
await room.run()
82-
except asyncio.CancelledError:
83-
logging.info("closing the room")
84-
source_task.cancel()
85-
await source_task
86-
await room.disconnect()
87-
89+
asyncio.ensure_future(draw_cube(source))
8890

8991
if __name__ == "__main__":
9092
logging.basicConfig(level=logging.INFO, handlers=[
91-
logging.FileHandler("publish_hue.log"), logging.StreamHandler()])
93+
logging.FileHandler("e2ee.log"),
94+
logging.StreamHandler()])
9295

9396
loop = asyncio.get_event_loop()
94-
main_task = asyncio.ensure_future(main())
97+
room = livekit.Room(loop=loop)
98+
99+
async def cleanup():
100+
await room.disconnect()
101+
loop.stop()
102+
103+
asyncio.ensure_future(main(room))
95104
for signal in [SIGINT, SIGTERM]:
96-
loop.add_signal_handler(signal, main_task.cancel)
105+
loop.add_signal_handler(
106+
signal, lambda: asyncio.ensure_future(cleanup()))
107+
97108
try:
98-
loop.run_until_complete(main_task)
109+
loop.run_forever()
99110
finally:
100111
loop.close()

examples/face_landmark/face_landmark.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
2+
import logging
23
import os
3-
import signal
4+
from signal import SIGINT, SIGTERM
45

56
import cv2
67
import mediapipe as mp
@@ -11,7 +12,7 @@
1112
import livekit
1213

1314
URL = 'ws://localhost:7880'
14-
TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY'
15+
TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5MDY2MTMyODgsImlzcyI6IkFQSVRzRWZpZFpqclFvWSIsIm5hbWUiOiJuYXRpdmUiLCJuYmYiOjE2NzI2MTMyODgsInN1YiI6Im5hdGl2ZSIsInZpZGVvIjp7InJvb20iOiJ0ZXN0Iiwicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZSwicm9vbUxpc3QiOnRydWV9fQ.uSNIangMRu8jZD5mnRYoCHjcsQWCrJXgHCs0aNIgBFY' # noqa
1516

1617
tasks = set()
1718

@@ -40,7 +41,9 @@ def draw_landmarks_on_image(rgb_image, detection_result):
4041
# Draw the face landmarks.
4142
face_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
4243
face_landmarks_proto.landmark.extend([
43-
landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks
44+
landmark_pb2.NormalizedLandmark(
45+
x=landmark.x, y=landmark.y, z=landmark.z)
46+
for landmark in face_landmarks
4447
])
4548

4649
solutions.drawing_utils.draw_landmarks(
@@ -74,7 +77,8 @@ async def frame_loop(video_stream: livekit.VideoStream) -> None:
7477
async for frame in video_stream:
7578
buffer = frame.buffer
7679

77-
if argb_frame is None or argb_frame.width != buffer.width or argb_frame.height != buffer.height:
80+
if argb_frame is None or argb_frame.width != buffer.width \
81+
or argb_frame.height != buffer.height:
7882
argb_frame = livekit.ArgbFrame(
7983
livekit.VideoFormatType.FORMAT_ABGR, buffer.width, buffer.height)
8084

@@ -102,39 +106,45 @@ async def frame_loop(video_stream: livekit.VideoStream) -> None:
102106
cv2.destroyAllWindows()
103107

104108

105-
async def main() -> None:
106-
room = livekit.Room()
107-
108-
loop = asyncio.get_event_loop()
109-
loop.add_signal_handler(
110-
signal.SIGINT, lambda: exit(0))
111-
112-
await room.connect(URL, TOKEN, livekit.RoomOptions(
113-
# Unncomment below to enable E2EE
114-
# e2ee=livekit.E2EEOptions(
115-
# key_provider_options=livekit.KeyProviderOptions(
116-
# shared_key=b"livekitrocks"
117-
# )
118-
# ),
119-
))
120-
print("connected to room: " + room.name)
121-
109+
async def main(room: livekit.Room) -> None:
122110
video_stream = None
123111

124112
@room.on("track_subscribed")
125113
def on_track_subscribed(track: livekit.Track, *_):
126114
if track.kind == livekit.TrackKind.KIND_VIDEO:
127115
nonlocal video_stream
128-
# only process the first stream received
129116
if video_stream is not None:
117+
# only process the first stream received
130118
return
119+
131120
print("subscribed to track: " + track.name)
132121
video_stream = livekit.VideoStream(track)
133122
task = asyncio.create_task(frame_loop(video_stream))
134123
tasks.add(task)
135124
task.add_done_callback(tasks.remove)
136125

137-
await room.run()
126+
await room.connect(URL, TOKEN)
127+
print("connected to room: " + room.name)
128+
138129

139130
if __name__ == "__main__":
140-
asyncio.run(main())
131+
logging.basicConfig(level=logging.INFO, handlers=[
132+
logging.FileHandler("face_landmark.log"),
133+
logging.StreamHandler()])
134+
135+
loop = asyncio.get_event_loop()
136+
room = livekit.Room(loop=loop)
137+
138+
async def cleanup():
139+
await room.disconnect()
140+
loop.stop()
141+
142+
asyncio.ensure_future(main(room))
143+
for signal in [SIGINT, SIGTERM]:
144+
loop.add_signal_handler(
145+
signal, lambda: asyncio.ensure_future(cleanup()))
146+
147+
try:
148+
loop.run_forever()
149+
finally:
150+
loop.close()

0 commit comments

Comments
 (0)