@@ -623,7 +623,6 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
623
623
624
624
liveParams : & liveRequestParams {
625
625
segmentReader : ssr ,
626
- dataWriter : media .NewSegmentWriter (5 ),
627
626
rtmpOutputs : rtmpOutputs ,
628
627
localRTMPPrefix : mediaMTXInputURL ,
629
628
stream : streamName ,
@@ -638,6 +637,15 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
638
637
},
639
638
}
640
639
640
+ //create a dataWriter for data channel if enabled
641
+ if enableData , ok := pipelineParams ["enableData" ]; ok {
642
+ if enableData == true || enableData == "true" {
643
+ params .liveParams .dataWriter = media .NewSegmentWriter (5 )
644
+ pipelineParams ["enableData" ] = true
645
+ clog .Infof (ctx , "Data channel enabled for stream %s" , streamName )
646
+ }
647
+ }
648
+
641
649
registerControl (ctx , params )
642
650
643
651
// Create a special parent context for orchestrator cancellation
@@ -763,6 +771,8 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{
763
771
resp := res .(* worker.GenLiveVideoToVideoResponse )
764
772
765
773
host := params .liveParams .sess .Transcoder ()
774
+
775
+ //required channels
766
776
pub , err := common .AppendHostname (resp .JSON200 .PublishUrl , host )
767
777
if err != nil {
768
778
return fmt .Errorf ("invalid publish URL: %w" , err )
@@ -779,21 +789,30 @@ func startProcessing(ctx context.Context, params aiRequestParams, res interface{
779
789
if err != nil {
780
790
return fmt .Errorf ("invalid events URL: %w" , err )
781
791
}
782
- data , err := common .AppendHostname (* resp .JSON200 .DataUrl , host )
783
- if err != nil {
784
- return fmt .Errorf ("invalid data URL: %w" , err )
785
- }
792
+
786
793
if resp .JSON200 .ManifestId != nil {
787
794
ctx = clog .AddVal (ctx , "manifest_id" , * resp .JSON200 .ManifestId )
788
795
params .liveParams .manifestID = * resp .JSON200 .ManifestId
789
796
}
790
- clog .V (common .VERBOSE ).Infof (ctx , "pub %s sub %s control %s events %s data %s" , pub , sub , control , events , data )
797
+
798
+ clog .V (common .VERBOSE ).Infof (ctx , "pub %s sub %s control %s events %s" , pub , sub , control , events )
791
799
792
800
startControlPublish (ctx , control , params )
793
801
startTricklePublish (ctx , pub , params , params .liveParams .sess )
794
802
startTrickleSubscribe (ctx , sub , params , params .liveParams .sess )
795
803
startEventsSubscribe (ctx , events , params , params .liveParams .sess )
796
- startDataSubscribe (ctx , data , params , params .liveParams .sess )
804
+
805
+ //optional channels
806
+ var data * url.URL
807
+ if * resp .JSON200 .DataUrl != "" {
808
+ data , err = common .AppendHostname (* resp .JSON200 .DataUrl , host )
809
+ if err != nil {
810
+ return fmt .Errorf ("invalid data URL: %w" , err )
811
+ }
812
+ clog .V (common .VERBOSE ).Infof (ctx , "data %s" , data )
813
+ startDataSubscribe (ctx , data , params , params .liveParams .sess )
814
+ }
815
+
797
816
return nil
798
817
}
799
818
@@ -1090,7 +1109,6 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
1090
1109
1091
1110
liveParams : & liveRequestParams {
1092
1111
segmentReader : ssr ,
1093
- dataWriter : media .NewSegmentWriter (5 ),
1094
1112
rtmpOutputs : rtmpOutputs ,
1095
1113
localRTMPPrefix : internalOutputHost ,
1096
1114
stream : streamName ,
@@ -1106,6 +1124,15 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
1106
1124
},
1107
1125
}
1108
1126
1127
+ //create a dataWriter for data channel if enabled
1128
+ if enableData , ok := pipelineParams ["enableData" ]; ok {
1129
+ if enableData == true || enableData == "true" {
1130
+ params .liveParams .dataWriter = media .NewSegmentWriter (5 )
1131
+ pipelineParams ["enableData" ] = true
1132
+ clog .Infof (ctx , "Data channel enabled for stream %s" , streamName )
1133
+ }
1134
+ }
1135
+
1109
1136
registerControl (ctx , params )
1110
1137
1111
1138
req := worker.GenLiveVideoToVideoJSONRequestBody {
0 commit comments