Skip to content
Open

BYOC #3727

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
a139b2f
add data channel
ad-astra-video Jul 17, 2025
b2507bf
update to new trickle subscriber api
ad-astra-video Jul 22, 2025
8b7c61b
update ai-runner bindings
ad-astra-video Jul 22, 2025
00fd90c
remove Orch data url short circuit
ad-astra-video Jul 22, 2025
3a6d24a
move datastore items to separte file similar to status
ad-astra-video Jul 28, 2025
a1323ee
update to remove datastore and use ringbuffer reader
ad-astra-video Aug 1, 2025
642814d
ai/live: Read JSONL from a time delimited data channel.
j0sh Aug 14, 2025
5a07065
copy dataWriter in newParams
ad-astra-video Aug 19, 2025
35ff4ac
update data channel mimetype
ad-astra-video Aug 20, 2025
aa2f0b8
update code gen for data_url added to LiveVideoToVideoResponse
ad-astra-video Aug 20, 2025
3e166c5
update to use live video response DataUrl field
ad-astra-video Aug 20, 2025
699f778
update to make data channel optional
ad-astra-video Aug 20, 2025
bc624ee
add byoc streaming
ad-astra-video Aug 21, 2025
9ac928d
refactor to move job setup to separate function
ad-astra-video Aug 21, 2025
ba8525c
fix
ad-astra-video Aug 21, 2025
33c05ae
refactor to reuse job request send to orch
ad-astra-video Aug 22, 2025
e52582c
fix
ad-astra-video Aug 22, 2025
8a7ec3f
capabilities: add live-ai external capabilities to Capabilities to en…
ad-astra-video Aug 22, 2025
fa105de
capabilities: fix
ad-astra-video Aug 22, 2025
5e8676b
updates to add streaming to byoc
ad-astra-video Aug 22, 2025
b78405d
continue build out of gateway and orchestrator buildout
ad-astra-video Aug 26, 2025
80b56ba
building out byoc stream
ad-astra-video Aug 27, 2025
56def52
updates
ad-astra-video Aug 28, 2025
b480f78
only set trickle urls for gateway if enabled
ad-astra-video Aug 28, 2025
469da95
various updates
ad-astra-video Aug 29, 2025
c6cfec1
add OrchToken to stream for each current orch
ad-astra-video Aug 29, 2025
a1b9a57
various updates
ad-astra-video Aug 29, 2025
9c51a2f
small update to add vals to ctx
ad-astra-video Aug 29, 2025
015c433
various updates
ad-astra-video Aug 30, 2025
3f3b156
refactor create payment and job payment processing
ad-astra-video Aug 30, 2025
8bd0849
report not active if lost control pub
ad-astra-video Aug 30, 2025
cbaba08
fix init of orchBal when 0 price set
ad-astra-video Sep 2, 2025
3bb3000
remove changes in separate pr
ad-astra-video Sep 2, 2025
c4c9bd6
updates for payment stability
ad-astra-video Sep 2, 2025
2455161
simplify stream cleanup
ad-astra-video Sep 2, 2025
e2e23ed
fix workerRoute and stream_ingest_metrics
ad-astra-video Sep 3, 2025
f8198f8
move JobToken and JobSender to core package and fix runStream
ad-astra-video Sep 3, 2025
9aacc0d
fix rtmp streaming and storing orch url
ad-astra-video Sep 4, 2025
12f8895
add error logging to retrying stream
ad-astra-video Sep 4, 2025
8246825
gateway job_stream refactor
ad-astra-video Sep 5, 2025
358cea2
remove job_trickle.go
ad-astra-video Sep 5, 2025
ec00c5e
slim down StreamInfo in external capabilities used by Orchestrator
ad-astra-video Sep 5, 2025
85e5c25
get new token for stop request
ad-astra-video Sep 5, 2025
9182ef8
some cleanup
ad-astra-video Sep 5, 2025
55666ef
fix merge update
ad-astra-video Sep 9, 2025
ddf5639
Add sdxl and faceid docker containers (#3738)
victorges Sep 11, 2025
f79d2fe
error handling, logging, lock stream info when checking active
ad-astra-video Sep 12, 2025
30af123
create get new token timeout
ad-astra-video Sep 12, 2025
d648a59
add retries to getToken
ad-astra-video Sep 12, 2025
da54cbf
add logging end response for too large body
ad-astra-video Sep 12, 2025
dcc98b1
move payment processing to separate functions: orch->processPayment, …
ad-astra-video Sep 12, 2025
5881ea9
fix error check, reduce segment writer history, fix stop stream
ad-astra-video Sep 16, 2025
6fb5308
close data segment writer after right and put segment writer back to …
ad-astra-video Sep 16, 2025
1ca67c4
tests tests tests
ad-astra-video Sep 19, 2025
1c97122
Merge branch 'master' into av/add-byoc-streaming
ad-astra-video Sep 19, 2025
a6c8d88
make stream payment processing more lenient
ad-astra-video Sep 24, 2025
b1771b7
add startTime to liveParams for First Segment Delay calcs
ad-astra-video Sep 24, 2025
3d01cc3
Merge branch 'master' into av/add-byoc-streaming
ad-astra-video Oct 1, 2025
9c94845
fix spacing
ad-astra-video Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion common/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,23 @@ func IgnoreRoutines() []goleak.Option {
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup",
"internal/synctest.Run",
"testing/synctest.testingSynctestTest",
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup", "github.com/livepeer/go-livepeer/server.startTrickleSubscribe.func2", "github.com/livepeer/go-livepeer/server.startTrickleSubscribe",
"net/http.(*persistConn).writeLoop", "net/http.(*persistConn).readLoop", "io.(*pipe).read",
"github.com/livepeer/go-livepeer/media.gatherIncomingTracks",
}

res := make([]goleak.Option, 0, len(funcs2ignore))
// Functions that might have other functions on top of their stack (like time.Sleep)
// These need to be ignored with IgnoreAnyFunction instead of IgnoreTopFunction
funcsAnyIgnore := []string{
"github.com/livepeer/go-livepeer/server.ffmpegOutput",
}

res := make([]goleak.Option, 0, len(funcs2ignore)+len(funcsAnyIgnore))
for _, f := range funcs2ignore {
res = append(res, goleak.IgnoreTopFunction(f))
}
for _, f := range funcsAnyIgnore {
res = append(res, goleak.IgnoreAnyFunction(f))
}
return res
}
6 changes: 6 additions & 0 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,12 @@ func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability s
return nil, err
}

//ensure price numerator and denominator can be int64
jobPrice, err = common.PriceToInt64(jobPrice)
if err != nil {
return nil, fmt.Errorf("invalid job price: %w", err)
}

return &net.PriceInfo{
PricePerUnit: jobPrice.Num().Int64(),
PixelsPerUnit: jobPrice.Denom().Int64(),
Expand Down
151 changes: 150 additions & 1 deletion core/external_capabilities.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
package core

import (
"context"
"encoding/json"
"fmt"
"math/big"

"sync"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/trickle"
)

type JobToken struct {
SenderAddress *JobSender `json:"sender_address,omitempty"`
TicketParams *net.TicketParams `json:"ticket_params,omitempty"`
Balance int64 `json:"balance,omitempty"`
Price *net.PriceInfo `json:"price,omitempty"`
ServiceAddr string `json:"service_addr,omitempty"`

LastNonce uint32
}
type JobSender struct {
Addr string `json:"addr"`
Sig string `json:"sig"`
}

type ExternalCapability struct {
Name string `json:"name"`
Description string `json:"description"`
Expand All @@ -25,13 +43,144 @@ type ExternalCapability struct {
Load int
}

type StreamInfo struct {
StreamID string
Capability string

//Orchestrator fields
Sender ethcommon.Address
StreamRequest []byte
pubChannel *trickle.TrickleLocalPublisher
subChannel *trickle.TrickleLocalPublisher
controlChannel *trickle.TrickleLocalPublisher
eventsChannel *trickle.TrickleLocalPublisher
dataChannel *trickle.TrickleLocalPublisher
//Stream fields
JobParams string
StreamCtx context.Context
CancelStream context.CancelFunc

sdm sync.Mutex
}

func (sd *StreamInfo) IsActive() bool {
sd.sdm.Lock()
defer sd.sdm.Unlock()
if sd.StreamCtx.Err() != nil {
return false
}

if sd.controlChannel == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should/need to acquire the mutex sd.sdm.lock() here before reading or there is a possiblility of reading nil, old, or partial data

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d73fe37

return false
}

return true
}

func (sd *StreamInfo) UpdateParams(params string) {
sd.sdm.Lock()
defer sd.sdm.Unlock()
sd.JobParams = params
}

func (sd *StreamInfo) SetChannels(pub, sub, control, events, data *trickle.TrickleLocalPublisher) {
sd.sdm.Lock()
defer sd.sdm.Unlock()
sd.pubChannel = pub
sd.subChannel = sub
sd.controlChannel = control
sd.eventsChannel = events
sd.dataChannel = data
}

type ExternalCapabilities struct {
capm sync.Mutex
Capabilities map[string]*ExternalCapability
Streams map[string]*StreamInfo
}

func NewExternalCapabilities() *ExternalCapabilities {
return &ExternalCapabilities{Capabilities: make(map[string]*ExternalCapability)}
return &ExternalCapabilities{Capabilities: make(map[string]*ExternalCapability),
Streams: make(map[string]*StreamInfo),
}
}

func (extCaps *ExternalCapabilities) AddStream(streamID string, capability string, streamReq []byte) (*StreamInfo, error) {
extCaps.capm.Lock()
defer extCaps.capm.Unlock()
_, ok := extCaps.Streams[streamID]
if ok {
return nil, fmt.Errorf("stream already exists: %s", streamID)
}

//add to streams
ctx, cancel := context.WithCancel(context.Background())
stream := StreamInfo{
StreamID: streamID,
Capability: capability,
StreamRequest: streamReq,
StreamCtx: ctx,
CancelStream: cancel,
}
extCaps.Streams[streamID] = &stream

//clean up when stream ends
go func() {
<-ctx.Done()

//orchestrator channels shutdown
if stream.pubChannel != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some error handling here just in case we call close() twice ( even though adding locks to the other channel operations should prevent this )

Copy link
Collaborator Author

@ad-astra-video ad-astra-video Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added error if stmt to catch error and log it in d73fe37

if err := stream.pubChannel.Close(); err != nil {
glog.Errorf("error closing pubChannel for stream=%s: %v", streamID, err)
}
}
if stream.subChannel != nil {
if err := stream.subChannel.Close(); err != nil {
glog.Errorf("error closing subChannel for stream=%s: %v", streamID, err)
}
}
if stream.controlChannel != nil {
if err := stream.controlChannel.Close(); err != nil {
glog.Errorf("error closing controlChannel for stream=%s: %v", streamID, err)
}
}
if stream.eventsChannel != nil {
if err := stream.eventsChannel.Close(); err != nil {
glog.Errorf("error closing eventsChannel for stream=%s: %v", streamID, err)
}
}
if stream.dataChannel != nil {
if err := stream.dataChannel.Close(); err != nil {
glog.Errorf("error closing dataChannel for stream=%s: %v", streamID, err)
}
}
return
}()

return &stream, nil
}

func (extCaps *ExternalCapabilities) RemoveStream(streamID string) {
extCaps.capm.Lock()
defer extCaps.capm.Unlock()

streamInfo, ok := extCaps.Streams[streamID]
if ok {
//confirm stream context is canceled before deleting
if streamInfo.StreamCtx.Err() == nil {
streamInfo.CancelStream()
}
}

delete(extCaps.Streams, streamID)
}

func (extCaps *ExternalCapabilities) StreamExists(streamID string) bool {
extCaps.capm.Lock()
defer extCaps.capm.Unlock()

_, ok := extCaps.Streams[streamID]
return ok
}

func (extCaps *ExternalCapabilities) RemoveCapability(extCap string) {
Expand Down
50 changes: 50 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ orchestrator.go: Code that is called only when the node is in orchestrator mode.
package core

import (
"context"
"errors"
"math/big"
"math/rand"
Expand All @@ -19,6 +20,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/trickle"

Expand Down Expand Up @@ -183,6 +185,54 @@ type LivePipeline struct {
ControlPub *trickle.TricklePublisher
StopControl func()
ReportUpdate func([]byte)
DataWriter *media.SegmentWriter

StreamCtx context.Context
streamCancel context.CancelCauseFunc
streamParams interface{}
streamRequest []byte
}

func (n *LivepeerNode) NewLivePipeline(requestID, streamID, pipeline string, streamParams interface{}, streamRequest []byte) *LivePipeline {
streamCtx, streamCancel := context.WithCancelCause(context.Background())
n.LiveMu.Lock()
defer n.LiveMu.Unlock()
n.LivePipelines[streamID] = &LivePipeline{
RequestID: requestID,
StreamID: streamID,
Pipeline: pipeline,
StreamCtx: streamCtx,
streamParams: streamParams,
streamCancel: streamCancel,
streamRequest: streamRequest,
}
return n.LivePipelines[streamID]
}

func (n *LivepeerNode) RemoveLivePipeline(streamID string) {
n.LiveMu.Lock()
defer n.LiveMu.Unlock()
delete(n.LivePipelines, streamID)
}

func (p *LivePipeline) StreamParams() interface{} {
return p.streamParams
}

func (p *LivePipeline) UpdateStreamParams(newParams interface{}) {
p.streamParams = newParams
}

func (p *LivePipeline) StreamRequest() []byte {
return p.streamRequest
}

func (p *LivePipeline) StopStream(err error) {
if p.StopControl != nil {
p.StopControl()
}

p.streamCancel(err)
}

// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.
Expand Down
Loading
Loading