-
Notifications
You must be signed in to change notification settings - Fork 201
ai/live: Gateway native WHEP server #3691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Adds a native WHEP server to the gateway for playback. This offers: * Immediate connectivity. WHEP playback can begin as soon as the WHIP response is received, rather than having to poll the WHEP endpoint repeatedly until the output is available. The WHEP response will simply hang until the output is ready. * Faster startup times, especially in no-audio cases. We use the mpegts PMT to know immediately which tracks are available. This avoids having to probe several seconds for an audio track that may never come. * Removes the requirement to have a MediaMTX server in the mix. This can substantially simplify the infrastructure requirements for many deployments. However, MediaMTX would still be needed for RTMP ingest or non-WHEP playback. * The promise of much finer control over tracks and stats reports. /// Detailed Changes * Adds a WHEP server. Supports video (H.264) and audio (Opus). * Adds a mpegts-to-RTP timestamp converter to handle roll-overs between 33-bit mpegts (ugh) and 32-bit RTP timestamps. * Adds the output RingBuffer to the LivePipelines struct so it can be accessible via the WHEP handler. If the output is not yet ready, the WHEP handler waits until it is (via condvar). This allows WHEP sessions to be set up immediately as soon as ingest begins, regardless of whether the output is ready yet. * Updates the output RingBuffer to follow the io.Closer interface. Needed so we close the output buffer instead of writing to it indefinitely after a WHEP peerconnection goes away. * Off by default. The `LIVE_AI_WHEP_ADDR` environment variable needs to be set with at least a port number. This should be different from the WHIP port at `LIVE_AI_WHIP_ADDR`. * MediaMTX playback is still available; nothing changes there. * The swapping behavior does not change for now. If we swap orchestrators, the WHEP peerconnection will close and the client will have to re-connect WHEP (swaps do not affect ingest). /// Rollout Plan * Enable on staging via configuring a port via the `LIVE_AI_WHEP_ADDR` env var. Update `LIVE_AI_WHEP_URL` to point to the gateway instead of MediaMTX. * Leave things on staging for a couple weeks * When we are satisfied with staging, enable on production. Note that we can always enable WHEP on the gateway for manual testing without making it the playback default as long as the `LIVE_AI_WHEP_URL` env var points to MediaMTX.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sending partial
u.initialized = true | ||
} | ||
u.lastRaw = rawPts | ||
return uint32(rawPts + u.wrapCount*mpegtsMaxPts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this overflow as soon as u.wrapCount > 0
? Am I missing sth or maybe you need a %
with sth?
https://go.dev/play/p/dVuB5eJTbbQ
./prog.go:8:21: constant 8589963326 overflows uint32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would overflow a uint32 by wrapping back to zero. The main problem is there would be two discontinuous overflows: first when it overflows 32 bits during the cast to RTP timestamp, then again when the mpegts timestamp overflows 33 bits. So this bit of plumbing is to make those timestamps consistent.
The unit tests aren't the best at describing this intended behavior right now, so I'll add some comments here and rework the tests a bit so things are clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I assume we want it to overflow and "mod" to a value that fits into a uint32, instead of a panic. How come this doesn't panic like the playground snippet I shared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, probably some compile-time constant propagation + bounds checking. With real variables this doesn't happen, maybe because all the intermediate types are int64 until the last minute. The uint32 conversion does mod the value during overflows.
Here is the conversion code with a simplified set of test cases that shows the overflow behavior: https://go.dev/play/p/_6NNdC7LbSN
BTW had GPT math this out and seems like the explicit wraparound tracking isn't actually necessary since 2^33 - 2^32 = 2^32 ... so maybe we can just do a straight cast from the int64 timestamp to uint32 unless we want to also track the actual timestamps for some reason, idk
} | ||
|
||
func (s *WHEPServer) CreateWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request, mediaReader io.ReadCloser) { | ||
clog.Info(ctx, "creating whep") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add some more fields from the req?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do try to add the stream key, etc to the context a few lines down ... I can reshuffle so we validate and add that before printing this. What other fields would you add to the context?
I'd avoid things like the user-agent or IP; those can be printed once but doesn't need to be on every log line for this request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing specific that I was thinking of, just thought that a log with only "creating whep" wouldn't be very useful
http.Error(w, "Failed to create PeerConnection", http.StatusInternalServerError) | ||
return | ||
} | ||
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stopped review here, will continue monday
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if err != nil { | ||
clog.InfofErr(ctx, "Error creating track for h264", err) | ||
http.Error(w, "Error creating track for h264", http.StatusInternalServerError) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no context: I wonder if we need to be closing the pion "session" on these early returns? (if it's not automatic or sth)
Just in case not to leave the client hanging. Could have some kind of defer that automatically closes on errors idk
hasVideo = true | ||
|
||
case *mpegts.CodecOpus: | ||
webrtcTrack, err := NewLocalVideoTrack( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
misnamed struct? NewLocalVideoTrack
if !hasAudio && !hasVideo { | ||
clog.InfofErr(ctx, "No audio or video in media stream", errors.New("no audio or video")) | ||
http.Error(w, "No audio or video in media stream", http.StatusInternalServerError) | ||
return | ||
} else if !hasVideo { | ||
clog.Info(ctx, "No video in output") | ||
} else if !hasAudio { | ||
clog.Info(ctx, "No audio in output") | ||
} | ||
clog.Info(ctx, "Outputs", " tracks", trackCodecs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now should we always err if !hasVideo
and only log if no audio? Or maybe just log both props, together in the log below
if !hasAudio && !hasVideo { | |
clog.InfofErr(ctx, "No audio or video in media stream", errors.New("no audio or video")) | |
http.Error(w, "No audio or video in media stream", http.StatusInternalServerError) | |
return | |
} else if !hasVideo { | |
clog.Info(ctx, "No video in output") | |
} else if !hasAudio { | |
clog.Info(ctx, "No audio in output") | |
} | |
clog.Info(ctx, "Outputs", " tracks", trackCodecs) | |
if !hasVideo { | |
clog.InfofErr(ctx, "No video in media stream", errors.New("no video")) | |
http.Error(w, "No video in media stream", http.StatusInternalServerError) | |
return | |
} | |
clog.Info(ctx, "Outputs", "hasVideo", hasVideo, "hasAudio, hasAudio, "tracks", trackCodecs) |
|
||
writeErrs := []error{} | ||
for _, p := range packets { | ||
p.Timestamp = ts // ¯\_(ツ)_/¯ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this (comment) mean
stream, requestID := params.liveParams.stream, params.liveParams.requestID | ||
sess, exists := params.node.LivePipelines[stream] | ||
if !exists || sess.RequestID != requestID { | ||
clog.Info(ctx, "Did not set output writer due to nonexistent stream or mismatched request ID") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have the check and log, maybe specify in the log which one happened?
clog.Info(ctx, "Did not set output writer due to nonexistent stream or mismatched request ID") | |
clog.Info(ctx, "Did not set output writer due to nonexistent stream or mismatched request ID", "exists", exists) |
module github.com/livepeer/go-livepeer | ||
|
||
go 1.23.2 | ||
go 1.25.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess this will be already in main
NB: This requires golang 1.25; it is probably best to merge this PR first and let it soak on staging for a bit.