Skip to content
177 changes: 90 additions & 87 deletions ai/worker/runner.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/trickle"

Expand Down Expand Up @@ -174,6 +175,7 @@ type LivePipeline struct {
Pipeline string
ControlPub *trickle.TricklePublisher
StopControl func()
DataWriter *media.SegmentWriter
}

// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.
Expand Down
6 changes: 6 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
subUrl = pubUrl + "-out"
controlUrl = pubUrl + "-control"
eventsUrl = pubUrl + "-events"
dataUrl = pubUrl + "-data"
)

// Handle initial payment, the rest of the payments are done separately from the stream processing
Expand Down Expand Up @@ -180,6 +181,8 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
controlPubCh.CreateChannel()
eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json")
eventsCh.CreateChannel()
dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/json")
dataCh.CreateChannel()

// Start payment receiver which accounts the payments and stops the stream if the payment is insufficient
priceInfo := payment.GetExpectedPrice()
Expand Down Expand Up @@ -231,13 +234,15 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl)
subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl)
publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl)
dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl)

workerReq := worker.LiveVideoToVideoParams{
ModelId: req.ModelId,
PublishUrl: publishUrlOverwrite,
SubscribeUrl: subscribeUrlOverwrite,
EventsUrl: &eventsUrlOverwrite,
ControlUrl: &controlUrlOverwrite,
DataUrl: &dataUrlOverwrite,
Params: req.Params,
GatewayRequestId: &gatewayRequestID,
ManifestId: &mid,
Expand All @@ -255,6 +260,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
subCh.Close()
controlPubCh.Close()
eventsCh.Close()
dataCh.Close()
cancel()
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
Loading
Loading