Skip to content

correctly close the room when the disconnection comes from the server #333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions livekit/src/room/e2ee/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Debug, Formatter};

use self::key_provider::KeyProvider;

pub mod key_provider;
Expand All @@ -30,3 +32,9 @@ pub struct E2eeOptions {
pub encryption_type: EncryptionType,
pub key_provider: KeyProvider,
}

impl Debug for E2eeOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("E2eeOptions").field("encryption_type", &self.encryption_type).finish()
}
}
132 changes: 68 additions & 64 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ impl Default for DataPacket {
}
}

#[derive(Clone)]
#[derive(Default, Debug, Clone)]
pub struct Transcription {
pub participant_identity: String,
pub track_id: String,
pub segments: Vec<TranscriptionSegment>,
pub language: String,
}

#[derive(Clone)]
#[derive(Default, Debug, Clone)]
pub struct TranscriptionSegment {
pub id: String,
pub text: String,
Expand All @@ -213,7 +213,7 @@ pub struct TranscriptionSegment {
pub r#final: bool,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct RoomOptions {
pub auto_subscribe: bool,
pub adaptive_stream: bool,
Expand Down Expand Up @@ -243,14 +243,8 @@ impl Default for RoomOptions {
}
}

struct RoomHandle {
session_task: JoinHandle<()>,
close_emitter: oneshot::Sender<()>,
}

pub struct Room {
inner: Arc<RoomSession>,
handle: AsyncMutex<Option<RoomHandle>>,
}

impl Debug for Room {
Expand All @@ -263,12 +257,46 @@ impl Debug for Room {
}
}

struct RoomInfo {
metadata: String,
state: ConnectionState,
}

pub(crate) struct RoomSession {
rtc_engine: Arc<RtcEngine>,
sid: RoomSid,
name: String,
info: RwLock<RoomInfo>,
dispatcher: Dispatcher<RoomEvent>,
options: RoomOptions,
active_speakers: RwLock<Vec<Participant>>,
local_participant: LocalParticipant,
participants: RwLock<(
// Keep track of participants by sid and identity
HashMap<ParticipantSid, RemoteParticipant>,
HashMap<ParticipantIdentity, RemoteParticipant>,
)>,
e2ee_manager: E2eeManager,
room_task: AsyncMutex<Option<(JoinHandle<()>, oneshot::Sender<()>)>>,
}

impl Debug for RoomSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionInner")
.field("sid", &self.sid)
.field("name", &self.name)
.field("rtc_engine", &self.rtc_engine)
.finish()
}
}

impl Room {
pub async fn connect(
url: &str,
token: &str,
options: RoomOptions,
) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
// TODO(theomonnom): move connection logic to the RoomSession
let e2ee_manager = E2eeManager::new(options.e2ee.clone());
let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
url,
Expand Down Expand Up @@ -378,6 +406,7 @@ impl Room {
local_participant,
dispatcher: dispatcher.clone(),
e2ee_manager: e2ee_manager.clone(),
room_task: Default::default(),
});

e2ee_manager.on_state_changed({
Expand Down Expand Up @@ -430,28 +459,15 @@ impl Room {
inner.dispatcher.dispatch(&RoomEvent::Connected { participants_with_tracks });
inner.update_connection_state(ConnectionState::Connected);

let (close_emitter, close_receiver) = oneshot::channel();
let session_task =
livekit_runtime::spawn(inner.clone().room_task(engine_events, close_receiver));
let (close_tx, close_rx) = oneshot::channel();
let room_task = livekit_runtime::spawn(inner.clone().room_task(engine_events, close_rx));
inner.room_task.lock().await.replace((room_task, close_tx));

Ok((
Self {
inner,
handle: AsyncMutex::new(Some(RoomHandle { session_task, close_emitter })),
},
events,
))
Ok((Self { inner }, events))
}

pub async fn close(&self) -> RoomResult<()> {
if let Some(handle) = self.handle.lock().await.take() {
self.inner.close().await;
let _ = handle.close_emitter.send(());
let _ = handle.session_task.await;
Ok(())
} else {
Err(RoomError::AlreadyClosed)
}
self.inner.close(DisconnectReason::ClientInitiated).await
}

pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
Expand Down Expand Up @@ -495,38 +511,6 @@ impl Room {
}
}

struct RoomInfo {
metadata: String,
state: ConnectionState,
}

pub(crate) struct RoomSession {
rtc_engine: Arc<RtcEngine>,
sid: RoomSid,
name: String,
info: RwLock<RoomInfo>,
dispatcher: Dispatcher<RoomEvent>,
options: RoomOptions,
active_speakers: RwLock<Vec<Participant>>,
local_participant: LocalParticipant,
participants: RwLock<(
// Keep track of participants by sid and identity
HashMap<ParticipantSid, RemoteParticipant>,
HashMap<ParticipantIdentity, RemoteParticipant>,
)>,
e2ee_manager: E2eeManager,
}

impl Debug for RoomSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionInner")
.field("sid", &self.sid)
.field("name", &self.name)
.field("rtc_engine", &self.rtc_engine)
.finish()
}
}

impl RoomSession {
async fn room_task(
self: Arc<Self>,
Expand Down Expand Up @@ -598,9 +582,19 @@ impl RoomSession {
Ok(())
}

async fn close(&self) {
self.rtc_engine.close().await;
self.e2ee_manager.cleanup();
async fn close(&self, reason: DisconnectReason) -> RoomResult<()> {
if let Some((room_task, close_tx)) = self.room_task.lock().await.take() {
self.rtc_engine.close(reason).await;
self.e2ee_manager.cleanup();

let _ = close_tx.send(());
let _ = room_task.await;

self.dispatcher.clear();
Ok(())
} else {
Err(RoomError::AlreadyClosed)
}
}

/// Change the connection state and emit an event
Expand Down Expand Up @@ -956,11 +950,21 @@ impl RoomSession {
let _ = tx.send(());
}

fn handle_disconnected(&self, reason: DisconnectReason) {
log::debug!("disconnected from room: {:?}", reason);
fn handle_disconnected(self: &Arc<Self>, reason: DisconnectReason) {
if self.update_connection_state(ConnectionState::Disconnected) {
self.dispatcher.dispatch(&RoomEvent::Disconnected { reason });
}

if reason != DisconnectReason::ClientInitiated {
log::error!("unexpectedly disconnected from room: {:?}", reason);

livekit_runtime::spawn({
let inner = self.clone();
async move {
let _ = inner.close(reason).await;
}
});
}
}

fn handle_data(
Expand Down
12 changes: 6 additions & 6 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl RtcEngine {
Ok((Self { inner }, join_response, engine_events))
}

pub async fn close(&self) {
self.inner.close(DisconnectReason::ClientInitiated).await
pub async fn close(&self, reason: DisconnectReason) {
self.inner.close(reason).await
}

pub async fn publish_data(
Expand Down Expand Up @@ -343,7 +343,7 @@ impl EngineInner {
async fn engine_task(
self: Arc<Self>,
mut session_events: SessionEvents,
mut close_receiver: oneshot::Receiver<()>,
mut close_rx: oneshot::Receiver<()>,
) {
loop {
tokio::select! {
Expand All @@ -368,7 +368,7 @@ impl EngineInner {

task.await;
},
_ = &mut close_receiver => {
_ = &mut close_rx => {
break;
}
}
Expand Down Expand Up @@ -442,8 +442,8 @@ impl EngineInner {
session.close().await;
let _ = close_tx.send(());
let _ = engine_task.await;
let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
}
let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
}

/// When waiting for reconnection, it ensures we're always using the latest session.
Expand Down Expand Up @@ -504,7 +504,7 @@ impl EngineInner {
}
res = inner.reconnect_task() => {
if res.is_err() {
log::error!("failed to reconnect");
log::error!("failed to reconnect to the livekit room");
inner.close(DisconnectReason::UnknownReason).await;
} else {
log::info!("RtcEngine successfully recovered")
Expand Down