Skip to content

Commit 20b59b7

Browse files
authored
Make RTCPeerConnection::on_track spec-compliant (#355)
1 parent acef878 commit 20b59b7

File tree

15 files changed

+538
-601
lines changed

15 files changed

+538
-601
lines changed

examples/examples/broadcast/broadcast.rs

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@ use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
1313
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
1414
use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
1515
use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
16-
use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
1716
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
1817
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
19-
use webrtc::track::track_remote::TrackRemote;
2018
use webrtc::Error;
2119

2220
#[tokio::main]
@@ -127,61 +125,57 @@ async fn main() -> Result<()> {
127125
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
128126
// replaces the SSRC and sends them back
129127
let pc = Arc::downgrade(&peer_connection);
130-
peer_connection.on_track(Box::new(
131-
move |track: Option<Arc<TrackRemote>>, _receiver: Option<Arc<RTCRtpReceiver>>| {
132-
if let Some(track) = track {
133-
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
134-
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
135-
let media_ssrc = track.ssrc();
136-
let pc2 = pc.clone();
137-
tokio::spawn(async move {
138-
let mut result = Result::<usize>::Ok(0);
139-
while result.is_ok() {
140-
let timeout = tokio::time::sleep(Duration::from_secs(3));
141-
tokio::pin!(timeout);
142-
143-
tokio::select! {
144-
_ = timeout.as_mut() =>{
145-
if let Some(pc) = pc2.upgrade(){
146-
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
147-
sender_ssrc: 0,
148-
media_ssrc,
149-
})]).await.map_err(Into::into);
150-
}else{
151-
break;
152-
}
153-
}
154-
};
155-
}
156-
});
157-
158-
let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
159-
tokio::spawn(async move {
160-
// Create Track that we send video back to browser on
161-
let local_track = Arc::new(TrackLocalStaticRTP::new(
162-
track.codec().await.capability,
163-
"video".to_owned(),
164-
"webrtc-rs".to_owned(),
165-
));
166-
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;
167-
168-
// Read RTP packets being sent to webrtc-rs
169-
while let Ok((rtp, _)) = track.read_rtp().await {
170-
if let Err(err) = local_track.write_rtp(&rtp).await {
171-
if Error::ErrClosedPipe != err {
172-
print!("output track write_rtp got error: {} and break", err);
173-
break;
174-
} else {
175-
print!("output track write_rtp got error: {}", err);
176-
}
128+
peer_connection.on_track(Box::new(move |track, _, _| {
129+
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
130+
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
131+
let media_ssrc = track.ssrc();
132+
let pc2 = pc.clone();
133+
tokio::spawn(async move {
134+
let mut result = Result::<usize>::Ok(0);
135+
while result.is_ok() {
136+
let timeout = tokio::time::sleep(Duration::from_secs(3));
137+
tokio::pin!(timeout);
138+
139+
tokio::select! {
140+
_ = timeout.as_mut() =>{
141+
if let Some(pc) = pc2.upgrade(){
142+
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
143+
sender_ssrc: 0,
144+
media_ssrc,
145+
})]).await.map_err(Into::into);
146+
}else{
147+
break;
177148
}
178149
}
179-
});
150+
};
151+
}
152+
});
153+
154+
let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
155+
tokio::spawn(async move {
156+
// Create Track that we send video back to browser on
157+
let local_track = Arc::new(TrackLocalStaticRTP::new(
158+
track.codec().await.capability,
159+
"video".to_owned(),
160+
"webrtc-rs".to_owned(),
161+
));
162+
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;
163+
164+
// Read RTP packets being sent to webrtc-rs
165+
while let Ok((rtp, _)) = track.read_rtp().await {
166+
if let Err(err) = local_track.write_rtp(&rtp).await {
167+
if Error::ErrClosedPipe != err {
168+
print!("output track write_rtp got error: {} and break", err);
169+
break;
170+
} else {
171+
print!("output track write_rtp got error: {}", err);
172+
}
173+
}
180174
}
175+
});
181176

182-
Box::pin(async {})
183-
},
184-
));
177+
Box::pin(async {})
178+
}));
185179

186180
// Set the handler for Peer connection state
187181
// This will notify you when the peer has connected/disconnected

examples/examples/reflect/reflect.rs

Lines changed: 61 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndicat
1616
use webrtc::rtp_transceiver::rtp_codec::{
1717
RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
1818
};
19-
use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
2019
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
2120
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
22-
use webrtc::track::track_remote::TrackRemote;
2321

2422
#[tokio::main]
2523
async fn main() -> Result<()> {
@@ -198,74 +196,71 @@ async fn main() -> Result<()> {
198196
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
199197
// replaces the SSRC and sends them back
200198
let pc = Arc::downgrade(&peer_connection);
201-
peer_connection.on_track(Box::new(
202-
move |track: Option<Arc<TrackRemote>>, _receiver: Option<Arc<RTCRtpReceiver>>| {
203-
if let Some(track) = track {
204-
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
205-
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
206-
let media_ssrc = track.ssrc();
207-
208-
if track.kind() == RTPCodecType::Video {
209-
let pc2 = pc.clone();
210-
tokio::spawn(async move {
211-
let mut result = Result::<usize>::Ok(0);
212-
while result.is_ok() {
213-
let timeout = tokio::time::sleep(Duration::from_secs(3));
214-
tokio::pin!(timeout);
215-
216-
tokio::select! {
217-
_ = timeout.as_mut() =>{
218-
if let Some(pc) = pc2.upgrade(){
219-
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
220-
sender_ssrc: 0,
221-
media_ssrc,
222-
})]).await.map_err(Into::into);
223-
}else{
224-
break;
225-
}
226-
}
227-
};
199+
peer_connection.on_track(Box::new(move |track, _, _| {
200+
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
201+
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
202+
let media_ssrc = track.ssrc();
203+
204+
if track.kind() == RTPCodecType::Video {
205+
let pc2 = pc.clone();
206+
tokio::spawn(async move {
207+
let mut result = Result::<usize>::Ok(0);
208+
while result.is_ok() {
209+
let timeout = tokio::time::sleep(Duration::from_secs(3));
210+
tokio::pin!(timeout);
211+
212+
tokio::select! {
213+
_ = timeout.as_mut() =>{
214+
if let Some(pc) = pc2.upgrade(){
215+
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
216+
sender_ssrc: 0,
217+
media_ssrc,
218+
})]).await.map_err(Into::into);
219+
}else{
220+
break;
221+
}
228222
}
229-
});
223+
};
230224
}
225+
});
226+
}
231227

232-
let kind = if track.kind() == RTPCodecType::Audio {
233-
"audio"
234-
} else {
235-
"video"
236-
};
237-
let output_track = if let Some(output_track) = output_tracks.get(kind) {
238-
Arc::clone(output_track)
239-
} else {
240-
println!("output_track not found for type = {}", kind);
241-
return Box::pin(async {});
242-
};
243-
244-
let output_track2 = Arc::clone(&output_track);
245-
tokio::spawn(async move {
246-
println!(
247-
"Track has started, of type {}: {}",
248-
track.payload_type(),
249-
track.codec().await.capability.mime_type
250-
);
251-
// Read RTP packets being sent to webrtc-rs
252-
while let Ok((rtp, _)) = track.read_rtp().await {
253-
if let Err(err) = output_track2.write_rtp(&rtp).await {
254-
println!("output track write_rtp got error: {}", err);
255-
break;
256-
}
257-
}
258-
259-
println!(
260-
"on_track finished, of type {}: {}",
261-
track.payload_type(),
262-
track.codec().await.capability.mime_type
263-
);
264-
});
228+
let kind = if track.kind() == RTPCodecType::Audio {
229+
"audio"
230+
} else {
231+
"video"
232+
};
233+
let output_track = if let Some(output_track) = output_tracks.get(kind) {
234+
Arc::clone(output_track)
235+
} else {
236+
println!("output_track not found for type = {}", kind);
237+
return Box::pin(async {});
238+
};
239+
240+
let output_track2 = Arc::clone(&output_track);
241+
tokio::spawn(async move {
242+
println!(
243+
"Track has started, of type {}: {}",
244+
track.payload_type(),
245+
track.codec().await.capability.mime_type
246+
);
247+
// Read RTP packets being sent to webrtc-rs
248+
while let Ok((rtp, _)) = track.read_rtp().await {
249+
if let Err(err) = output_track2.write_rtp(&rtp).await {
250+
println!("output track write_rtp got error: {}", err);
251+
break;
252+
}
265253
}
266-
Box::pin(async {})
267-
},
268-
));
254+
255+
println!(
256+
"on_track finished, of type {}: {}",
257+
track.payload_type(),
258+
track.codec().await.capability.mime_type
259+
);
260+
});
261+
262+
Box::pin(async {})
263+
}));
269264

270265
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
271266

0 commit comments

Comments
 (0)