Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 47 additions & 53 deletions examples/examples/broadcast/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::track::track_remote::TrackRemote;
use webrtc::Error;

#[tokio::main]
Expand Down Expand Up @@ -127,61 +125,57 @@ async fn main() -> Result<()> {
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
// replaces the SSRC and sends them back
let pc = Arc::downgrade(&peer_connection);
peer_connection.on_track(Box::new(
move |track: Option<Arc<TrackRemote>>, _receiver: Option<Arc<RTCRtpReceiver>>| {
if let Some(track) = track {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
let media_ssrc = track.ssrc();
let pc2 = pc.clone();
tokio::spawn(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
_ = timeout.as_mut() =>{
if let Some(pc) = pc2.upgrade(){
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
sender_ssrc: 0,
media_ssrc,
})]).await.map_err(Into::into);
}else{
break;
}
}
};
}
});

let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
tokio::spawn(async move {
// Create Track that we send video back to browser on
let local_track = Arc::new(TrackLocalStaticRTP::new(
track.codec().await.capability,
"video".to_owned(),
"webrtc-rs".to_owned(),
));
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;

// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = local_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
print!("output track write_rtp got error: {} and break", err);
break;
} else {
print!("output track write_rtp got error: {}", err);
}
peer_connection.on_track(Box::new(move |track, _, _| {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
let media_ssrc = track.ssrc();
let pc2 = pc.clone();
tokio::spawn(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
_ = timeout.as_mut() =>{
if let Some(pc) = pc2.upgrade(){
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
sender_ssrc: 0,
media_ssrc,
})]).await.map_err(Into::into);
}else{
break;
}
}
});
};
}
});

let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
tokio::spawn(async move {
// Create Track that we send video back to browser on
let local_track = Arc::new(TrackLocalStaticRTP::new(
track.codec().await.capability,
"video".to_owned(),
"webrtc-rs".to_owned(),
));
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;

// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = local_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
print!("output track write_rtp got error: {} and break", err);
break;
} else {
print!("output track write_rtp got error: {}", err);
}
}
}
});

Box::pin(async {})
},
));
Box::pin(async {})
}));

// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
Expand Down
127 changes: 61 additions & 66 deletions examples/examples/reflect/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndicat
use webrtc::rtp_transceiver::rtp_codec::{
RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
};
use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
use webrtc::track::track_remote::TrackRemote;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -198,74 +196,71 @@ async fn main() -> Result<()> {
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
// replaces the SSRC and sends them back
let pc = Arc::downgrade(&peer_connection);
peer_connection.on_track(Box::new(
move |track: Option<Arc<TrackRemote>>, _receiver: Option<Arc<RTCRtpReceiver>>| {
if let Some(track) = track {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
let media_ssrc = track.ssrc();

if track.kind() == RTPCodecType::Video {
let pc2 = pc.clone();
tokio::spawn(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
_ = timeout.as_mut() =>{
if let Some(pc) = pc2.upgrade(){
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
sender_ssrc: 0,
media_ssrc,
})]).await.map_err(Into::into);
}else{
break;
}
}
};
peer_connection.on_track(Box::new(move |track, _, _| {
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
let media_ssrc = track.ssrc();

if track.kind() == RTPCodecType::Video {
let pc2 = pc.clone();
tokio::spawn(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(timeout);

tokio::select! {
_ = timeout.as_mut() =>{
if let Some(pc) = pc2.upgrade(){
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
sender_ssrc: 0,
media_ssrc,
})]).await.map_err(Into::into);
}else{
break;
}
}
});
};
}
});
}

let kind = if track.kind() == RTPCodecType::Audio {
"audio"
} else {
"video"
};
let output_track = if let Some(output_track) = output_tracks.get(kind) {
Arc::clone(output_track)
} else {
println!("output_track not found for type = {}", kind);
return Box::pin(async {});
};

let output_track2 = Arc::clone(&output_track);
tokio::spawn(async move {
println!(
"Track has started, of type {}: {}",
track.payload_type(),
track.codec().await.capability.mime_type
);
// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = output_track2.write_rtp(&rtp).await {
println!("output track write_rtp got error: {}", err);
break;
}
}

println!(
"on_track finished, of type {}: {}",
track.payload_type(),
track.codec().await.capability.mime_type
);
});
let kind = if track.kind() == RTPCodecType::Audio {
"audio"
} else {
"video"
};
let output_track = if let Some(output_track) = output_tracks.get(kind) {
Arc::clone(output_track)
} else {
println!("output_track not found for type = {}", kind);
return Box::pin(async {});
};

let output_track2 = Arc::clone(&output_track);
tokio::spawn(async move {
println!(
"Track has started, of type {}: {}",
track.payload_type(),
track.codec().await.capability.mime_type
);
// Read RTP packets being sent to webrtc-rs
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = output_track2.write_rtp(&rtp).await {
println!("output track write_rtp got error: {}", err);
break;
}
}
Box::pin(async {})
},
));

println!(
"on_track finished, of type {}: {}",
track.payload_type(),
track.codec().await.capability.mime_type
);
});

Box::pin(async {})
}));

let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);

Expand Down
Loading