Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 93 additions & 87 deletions ai/worker/runner.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/trickle"

Expand Down Expand Up @@ -174,6 +175,7 @@ type LivePipeline struct {
Pipeline string
ControlPub *trickle.TricklePublisher
StopControl func()
DataWriter *media.SegmentWriter
}

// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.
Expand Down
43 changes: 43 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,29 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
subUrl = pubUrl + "-out"
controlUrl = pubUrl + "-control"
eventsUrl = pubUrl + "-events"
dataUrl = pubUrl + "-data"
)

//if data is not enabled remove the url and do not start the data channel
if enableData, ok := (*req.Params)["enableData"]; ok {
if val, ok := enableData.(bool); ok {
//turn off data channel if request sets to false
if !val {
dataUrl = ""
} else {
clog.Infof(ctx, "data channel is enabled")
}
} else {
clog.Warningf(ctx, "enableData is not a bool, got type %T", enableData)
}

//delete the param used for go-livepeer signaling
delete((*req.Params), "enableData")
} else {
//default to no data channel
dataUrl = ""
}

// Handle initial payment, the rest of the payments are done separately from the stream processing
// Note that this payment is debit from the balance and acts as a buffer for the AI Realtime Video processing
payment, err := getPayment(r.Header.Get(paymentHeader))
Expand Down Expand Up @@ -181,6 +202,13 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json")
eventsCh.CreateChannel()

//optional channels
var dataCh *trickle.TrickleLocalPublisher
if dataUrl != "" {
dataCh = trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl")
dataCh.CreateChannel()
}

// Start payment receiver which accounts the payments and stops the stream if the payment is insufficient
priceInfo := payment.GetExpectedPrice()
var paymentProcessor *LivePaymentProcessor
Expand All @@ -200,6 +228,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
subCh.Close()
eventsCh.Close()
controlPubCh.Close()
if dataCh != nil {
dataCh.Close()
}
cancel()
}
return err
Expand Down Expand Up @@ -227,17 +258,25 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
}()

// Prepare request to worker
// required channels
controlUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, controlUrl)
eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl)
subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl)
publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl)

// optional channels
var dataUrlOverwrite string
if dataCh != nil {
dataUrlOverwrite = overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl)
}

workerReq := worker.LiveVideoToVideoParams{
ModelId: req.ModelId,
PublishUrl: publishUrlOverwrite,
SubscribeUrl: subscribeUrlOverwrite,
EventsUrl: &eventsUrlOverwrite,
ControlUrl: &controlUrlOverwrite,
DataUrl: &dataUrlOverwrite,
Params: req.Params,
GatewayRequestId: &gatewayRequestID,
ManifestId: &mid,
Expand All @@ -255,6 +294,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
subCh.Close()
controlPubCh.Close()
eventsCh.Close()
if dataCh != nil {
dataCh.Close()
}
cancel()
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -266,6 +308,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
SubscribeUrl: subUrl,
ControlUrl: &controlUrl,
EventsUrl: &eventsUrl,
DataUrl: &dataUrl,
RequestId: &requestID,
ManifestId: &mid,
})
Expand Down
Loading
Loading