Skip to content

Commit b1bd70a

Browse files
update to make data channel optional
1 parent 4570909 commit b1bd70a

File tree

3 files changed

+81
-12
lines changed

3 files changed

+81
-12
lines changed

server/ai_http.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,26 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
148148
dataUrl = pubUrl + "-data"
149149
)
150150

151+
//if data is not enabled remove the url and do not start the data channel
152+
if enableData, ok := (*req.Params)["enableData"]; ok {
153+
if val, ok := enableData.(bool); ok {
154+
//turn off data channel if request sets to false
155+
if !val {
156+
dataUrl = ""
157+
} else {
158+
clog.Infof(ctx, "data channel is enabled")
159+
}
160+
} else {
161+
clog.Warningf(ctx, "enableData is not a bool, got type %T", enableData)
162+
}
163+
164+
//delete the param used for go-livepeer signaling
165+
delete((*req.Params), "enableData")
166+
} else {
167+
//default to no data channel
168+
dataUrl = ""
169+
}
170+
151171
// Handle initial payment, the rest of the payments are done separately from the stream processing
152172
// Note that this payment is debit from the balance and acts as a buffer for the AI Realtime Video processing
153173
payment, err := getPayment(r.Header.Get(paymentHeader))
@@ -181,8 +201,13 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
181201
controlPubCh.CreateChannel()
182202
eventsCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-events", "application/json")
183203
eventsCh.CreateChannel()
184-
dataCh := trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl")
185-
dataCh.CreateChannel()
204+
205+
//optional channels
206+
var dataCh *trickle.TrickleLocalPublisher
207+
if dataUrl != "" {
208+
dataCh = trickle.NewLocalPublisher(h.trickleSrv, mid+"-data", "application/jsonl")
209+
dataCh.CreateChannel()
210+
}
186211

187212
// Start payment receiver which accounts the payments and stops the stream if the payment is insufficient
188213
priceInfo := payment.GetExpectedPrice()
@@ -203,6 +228,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
203228
subCh.Close()
204229
eventsCh.Close()
205230
controlPubCh.Close()
231+
if dataCh != nil {
232+
dataCh.Close()
233+
}
206234
cancel()
207235
}
208236
return err
@@ -230,11 +258,17 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
230258
}()
231259

232260
// Prepare request to worker
261+
// required channels
233262
controlUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, controlUrl)
234263
eventsUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, eventsUrl)
235264
subscribeUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, pubUrl)
236265
publishUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, subUrl)
237-
dataUrlOverwrite := overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl)
266+
267+
// optional channels
268+
var dataUrlOverwrite string
269+
if dataCh != nil {
270+
dataUrlOverwrite = overwriteHost(h.node.LiveAITrickleHostForRunner, dataUrl)
271+
}
238272

239273
workerReq := worker.LiveVideoToVideoParams{
240274
ModelId: req.ModelId,
@@ -260,7 +294,9 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
260294
subCh.Close()
261295
controlPubCh.Close()
262296
eventsCh.Close()
263-
dataCh.Close()
297+
if dataCh != nil {
298+
dataCh.Close()
299+
}
264300
cancel()
265301
respondWithError(w, err.Error(), http.StatusInternalServerError)
266302
return
@@ -272,6 +308,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
272308
SubscribeUrl: subUrl,
273309
ControlUrl: &controlUrl,
274310
EventsUrl: &eventsUrl,
311+
DataUrl: &dataUrl,
275312
RequestId: &requestID,
276313
ManifestId: &mid,
277314
})

server/ai_live_video.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,11 @@ func startEventsSubscribe(ctx context.Context, url *url.URL, params aiRequestPar
729729
}
730730

731731
func startDataSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, sess *AISession) {
732+
//only start DataSubscribe if enabled
733+
if params.liveParams.dataWriter == nil {
734+
return
735+
}
736+
732737
// subscribe to the outputs
733738
subscriber, err := trickle.NewTrickleSubscriber(trickle.TrickleSubscriberConfig{
734739
URL: url.String(),

server/ai_mediaserver.go

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,6 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
623623

624624
liveParams: &liveRequestParams{
625625
segmentReader: ssr,
626-
dataWriter: media.NewSegmentWriter(5),
627626
rtmpOutputs: rtmpOutputs,
628627
localRTMPPrefix: mediaMTXInputURL,
629628
stream: streamName,
@@ -638,6 +637,15 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
638637
},
639638
}
640639

640+
//create a dataWriter for data channel if enabled
641+
if enableData, ok := pipelineParams["enableData"]; ok {
642+
if enableData == true || enableData == "true" {
643+
params.liveParams.dataWriter = media.NewSegmentWriter(5)
644+
pipelineParams["enableData"] = true
645+
clog.Infof(ctx, "Data channel enabled for stream %s", streamName)
646+
}
647+
}
648+
641649
registerControl(ctx, params)
642650

643651
// Create a special parent context for orchestrator cancellation
@@ -763,6 +771,8 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{
763771
resp := res.(*worker.GenLiveVideoToVideoResponse)
764772

765773
host := params.liveParams.sess.Transcoder()
774+
775+
//required channels
766776
pub, err := common.AppendHostname(resp.JSON200.PublishUrl, host)
767777
if err != nil {
768778
return fmt.Errorf("invalid publish URL: %w", err)
@@ -779,21 +789,30 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{
779789
if err != nil {
780790
return fmt.Errorf("invalid events URL: %w", err)
781791
}
782-
data, err := common.AppendHostname(*resp.JSON200.DataUrl, host)
783-
if err != nil {
784-
return fmt.Errorf("invalid data URL: %w", err)
785-
}
792+
786793
if resp.JSON200.ManifestId != nil {
787794
ctx = clog.AddVal(ctx, "manifest_id", *resp.JSON200.ManifestId)
788795
params.liveParams.manifestID = *resp.JSON200.ManifestId
789796
}
790-
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s data %s", pub, sub, control, events, data)
797+
798+
clog.V(common.VERBOSE).Infof(ctx, "pub %s sub %s control %s events %s", pub, sub, control, events)
791799

792800
startControlPublish(ctx, control, params)
793801
startTricklePublish(ctx, pub, params, params.liveParams.sess)
794802
startTrickleSubscribe(ctx, sub, params, params.liveParams.sess)
795803
startEventsSubscribe(ctx, events, params, params.liveParams.sess)
796-
startDataSubscribe(ctx, data, params, params.liveParams.sess)
804+
805+
//optional channels
806+
var data *url.URL
807+
if *resp.JSON200.DataUrl != "" {
808+
data, err = common.AppendHostname(*resp.JSON200.DataUrl, host)
809+
if err != nil {
810+
return fmt.Errorf("invalid data URL: %w", err)
811+
}
812+
clog.V(common.VERBOSE).Infof(ctx, "data %s", data)
813+
startDataSubscribe(ctx, data, params, params.liveParams.sess)
814+
}
815+
797816
return nil
798817
}
799818

@@ -1090,7 +1109,6 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
10901109

10911110
liveParams: &liveRequestParams{
10921111
segmentReader: ssr,
1093-
dataWriter: media.NewSegmentWriter(5),
10941112
rtmpOutputs: rtmpOutputs,
10951113
localRTMPPrefix: internalOutputHost,
10961114
stream: streamName,
@@ -1106,6 +1124,15 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
11061124
},
11071125
}
11081126

1127+
//create a dataWriter for data channel if enabled
1128+
if enableData, ok := pipelineParams["enableData"]; ok {
1129+
if enableData == true || enableData == "true" {
1130+
params.liveParams.dataWriter = media.NewSegmentWriter(5)
1131+
pipelineParams["enableData"] = true
1132+
clog.Infof(ctx, "Data channel enabled for stream %s", streamName)
1133+
}
1134+
}
1135+
11091136
registerControl(ctx, params)
11101137

11111138
req := worker.GenLiveVideoToVideoJSONRequestBody{

0 commit comments

Comments
 (0)