Skip to content

Conversation

ad-astra-video
Copy link
Collaborator

@ad-astra-video ad-astra-video commented Sep 2, 2025

What does this pull request do? Explain your changes. (required)

Adds configurable streaming for BYOC entrypoint to go-livepeer. Uses trickle protocol to handle streaming for similar entrypoints and outputs from go-livepeer as live-video-to-video.

Streams can be any or a mix of the following:

  • video ingress via WHIP (with Gateway) or RTMP (with MediaMTX)
  • video egress via RTMP (with MediaMTX) or WHEP (with mediamtx)
  • SSE data output

Control and Events channels are created for every stream.

Streams are created with a POST request to /ai/stream/start that will start the stream and reserve the capacity with an Orchestrator that is providing the BYOC capability. If video ingress is enabled, the client should then start a stream with WHIP or RTMP to the provided ingress URLs provided in the response. URLs for egress video, data, updates (control) and events are also included in the response as well as the stream_id. The stream_id is an integral part of the URLs provided to interact with the stream and is combined with a provided stream name in the /ai/stream/start request.

Streams are stopped with a POST request to /ai/stream/stop. Orchestrators and Gateways track payment balance and the Gateway adjusts to the Orchestrators provided balance in new JobTokens provided at each payment interval every minute. Orchestrators will shutdown a stream when payment balance is zero.

Specific updates (required)

  • Add job_stream.go and job_stream_test.go
  • refactor job_rpc.go to reuse stream setup where made sense
  • updates go routines to ignore to enable tests to pass in common/testutil.go.

How did you test each of these updates (required)

Used byoc-stream to test end to end: https://github.com/ad-astra-video/livepeer-app-pipelines/tree/main/byoc-stream
Added tests to job_stream_test.go and some additional tests to job_rpc_test.go.

Does this pull request close any open issues?

Checklist:

@github-actions github-actions bot added go Pull requests that update Go code AI Issues and PR related to the AI-video branch. labels Sep 2, 2025
Copy link

codecov bot commented Sep 2, 2025

Codecov Report

❌ Patch coverage is 52.68631% with 819 lines in your changes missing coverage. Please review.
✅ Project coverage is 33.37411%. Comparing base (d095d96) to head (1c97122).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
server/job_stream.go 61.05169% 369 Missing and 68 partials ⚠️
server/job_rpc.go 52.55255% 128 Missing and 30 partials ⚠️
server/ai_live_video.go 31.25000% 82 Missing and 6 partials ⚠️
core/external_capabilities.go 3.48837% 83 Missing ⚠️
core/livepeernode.go 0.00000% 29 Missing ⚠️
common/testutil.go 0.00000% 13 Missing ⚠️
core/ai_orchestrator.go 0.00000% 4 Missing ⚠️
server/ai_mediaserver.go 69.23077% 3 Missing and 1 partial ⚠️
server/rpc.go 0.00000% 3 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@                 Coverage Diff                 @@
##              master       #3727         +/-   ##
===================================================
+ Coverage   31.94301%   33.37411%   +1.43110%     
===================================================
  Files            158         159          +1     
  Lines          47519       49050       +1531     
===================================================
+ Hits           15179       16370       +1191     
- Misses         31437       31633        +196     
- Partials         903        1047        +144     
Files with missing lines Coverage Δ
server/ai_process.go 1.65975% <ø> (ø)
server/rpc.go 68.66029% <0.00000%> (+0.46752%) ⬆️
core/ai_orchestrator.go 29.24820% <0.00000%> (-0.12098%) ⬇️
server/ai_mediaserver.go 9.63542% <69.23077%> (+2.79331%) ⬆️
common/testutil.go 14.08451% <0.00000%> (-2.58216%) ⬇️
core/livepeernode.go 51.90476% <0.00000%> (-8.31623%) ⬇️
core/external_capabilities.go 28.45528% <3.48837%> (-58.38683%) ⬇️
server/ai_live_video.go 31.61209% <31.25000%> (+31.61209%) ⬆️
server/job_rpc.go 43.79277% <52.55255%> (+13.42325%) ⬆️
server/job_stream.go 61.05169% <61.05169%> (ø)

... and 4 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 19ded51...1c97122. Read the comment docs.

Files with missing lines Coverage Δ
server/ai_process.go 1.65975% <ø> (ø)
server/rpc.go 68.66029% <0.00000%> (+0.46752%) ⬆️
core/ai_orchestrator.go 29.24820% <0.00000%> (-0.12098%) ⬇️
server/ai_mediaserver.go 9.63542% <69.23077%> (+2.79331%) ⬆️
common/testutil.go 14.08451% <0.00000%> (-2.58216%) ⬇️
core/livepeernode.go 51.90476% <0.00000%> (-8.31623%) ⬇️
core/external_capabilities.go 28.45528% <3.48837%> (-58.38683%) ⬇️
server/ai_live_video.go 31.61209% <31.25000%> (+31.61209%) ⬆️
server/job_rpc.go 43.79277% <52.55255%> (+13.42325%) ⬆️
server/job_stream.go 61.05169% <61.05169%> (ø)

... and 4 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

}
stopJob.sign() //no changes to make, sign job

token, err := sessionToToken(params.liveParams.sess)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets check/log this err here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still hoping we can check and log this err

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an error check and log in d73fe37. Note that this function cannot return an error currently, should I just update to only return the JobToken. I think I added an error here thinking as I built it out something could cause an error possibly.

@pschroedl pschroedl marked this pull request as ready for review September 11, 2025 18:48
}
stopJob.sign() //no changes to make, sign job

token, err := sessionToToken(params.liveParams.sess)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still hoping we can check and log this err


// fetch new JobToken with each payment
// update the session for the LivePipeline with new token
newToken, err := getToken(ctx, 3*time.Second, token.ServiceAddr, stream.Pipeline, jobSender.Addr, jobSender.Sig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this 3 second param a constant, say TokenRefreshTimeout = 3 * time.second?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it would be good to have a few retries here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in bfb0135. Added 3 retries in 18db960 for the getToken request with a reasonable backoff with a check to stay within the timeout provided.

clog.Infof(ctx, "Starting stream processing")
//refresh the token if not first Orch to confirm capacity and new ticket params
if firstProcessed {
newToken, err := getToken(ctx, 3*time.Second, orch.ServiceAddr, gatewayJob.Job.Req.Capability, gatewayJob.Job.Req.Sender, gatewayJob.Job.Req.Sig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment RE: constant for timeout

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in bfb0135


// Setup request body to be able to preserve for retries
// Read the entire body first with 10MB limit
bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, 10<<20))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since io.ReadALL truncates to 10MB here, maybe we want to log that?

Suggested change
bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, 10<<20))
if len(bodyBytes) > 10<<20 {
fmt.Errorf("request body too large, maximum size is %d bytes", 10<<20)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated how the r.Body is limited to 10MB so that can get a specific error in 67cb9cd. Looked like from docs that limit reader would just stop reading and provide no error.

clog.V(common.DEBUG).Infof(ctx, "job price=%v units=%v", jobPrice.PricePerUnit, jobPrice.PixelsPerUnit)

//no payment included, confirm if balance remains
jobPriceRat := big.NewRat(jobPrice.PricePerUnit, jobPrice.PixelsPerUnit)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should extract this payment logic here into another method(s), a bit hard to reason about this with so all the nested conditions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus, Adding test(s) around that new method would be fantastic!


jobPriceRat := big.NewRat(orchJob.JobPrice.PricePerUnit, orchJob.JobPrice.PixelsPerUnit)
if jobPriceRat.Cmp(big.NewRat(0, 1)) > 0 {
h.orchestrator.DebitFees(orchJob.Sender, core.ManifestID(orchJob.Req.Capability), orchJob.JobPrice, int64(pmtCheckDur.Seconds()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This concerns me a bit as it seems like a couple things could happen here since these two operations ( debit and check ) are not one atomic operation
Between the debit and balance check, other goroutines can create race conditions by:

  • Crediting the account (incoming payments)
  • Debit the account (other streams)
  • Change the balance state(maybe somehow?)

I think we can pull these into another method debit and check and using a mutex lock to prevent this

stream, err := h.node.ExternalCapabilities.AddStream(orchJob.Req.ID, orchJob.Req.Capability, reqBodyBytes)
if err != nil {
clog.Errorf(ctx, "Error adding stream to external capabilities: %v", err)
respondWithError(w, "Error adding stream to external capabilities", http.StatusInternalServerError)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be calling RemoveStream here in case of error?

return false
}

if sd.controlChannel == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should/need to acquire the mutex sd.sdm.lock() here before reading or there is a possiblility of reading nil, old, or partial data

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d73fe37

<-ctx.Done()

//orchestrator channels shutdown
if stream.pubChannel != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some error handling here just in case we call close() twice ( even though adding locks to the other channel operations should prevent this )

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added error if stmt to catch error and log it in d73fe37

delete(ls.LivepeerNode.LivePipelines, streamId)
return
case <-pmtTicker.C:
if !params.inputStreamExists() {
Copy link
Collaborator

@pschroedl pschroedl Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggest breaking this case out into a new method

//ensure price numerator and denominator can be int64
jobPrice, err = common.PriceToInt64(jobPrice)
if err != nil {
return nil, err
Copy link
Collaborator

@pschroedl pschroedl Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    return nil, fmt.Errorf("invalid job price: %w", err)

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in d73fe37

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI Issues and PR related to the AI-video branch. go Pull requests that update Go code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants