Skip to content

Commit fdc7fc4

Browse files
slim down StreamInfo in external capabilities used by Orchestrator
1 parent 392416f commit fdc7fc4

File tree

2 files changed

+5
-34
lines changed

2 files changed

+5
-34
lines changed

core/external_capabilities.go

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
ethcommon "github.com/ethereum/go-ethereum/common"
1212
"github.com/golang/glog"
13-
"github.com/livepeer/go-livepeer/media"
1413
"github.com/livepeer/go-livepeer/net"
1514
"github.com/livepeer/go-livepeer/trickle"
1615
)
@@ -48,17 +47,7 @@ type StreamInfo struct {
4847
StreamID string
4948
Capability string
5049
//Gateway fields
51-
StreamRequest []byte
52-
ExcludeOrchs []string
53-
OrchToken *JobToken
54-
OrchUrl string
55-
OrchPublishUrl string
56-
OrchSubscribeUrl string
57-
OrchControlUrl string
58-
OrchEventsUrl string
59-
OrchDataUrl string
60-
ControlPub *trickle.TricklePublisher
61-
StopControl func()
50+
StreamRequest []byte
6251

6352
//Orchestrator fields
6453
Sender ethcommon.Address
@@ -68,8 +57,6 @@ type StreamInfo struct {
6857
eventsChannel *trickle.TrickleLocalPublisher
6958
dataChannel *trickle.TrickleLocalPublisher
7059
//Stream fields
71-
Params interface{}
72-
DataWriter *media.SegmentWriter
7360
JobParams string
7461
StreamCtx context.Context
7562
CancelStream context.CancelFunc
@@ -82,19 +69,13 @@ func (sd *StreamInfo) IsActive() bool {
8269
return false
8370
}
8471

85-
if sd.controlChannel == nil && sd.ControlPub == nil {
72+
if sd.controlChannel == nil {
8673
return false
8774
}
8875

8976
return true
9077
}
9178

92-
func (sd *StreamInfo) ExcludeOrch(orchUrl string) {
93-
sd.sdm.Lock()
94-
defer sd.sdm.Unlock()
95-
sd.ExcludeOrchs = append(sd.ExcludeOrchs, orchUrl)
96-
}
97-
9879
func (sd *StreamInfo) UpdateParams(params string) {
9980
sd.sdm.Lock()
10081
defer sd.sdm.Unlock()
@@ -123,7 +104,7 @@ func NewExternalCapabilities() *ExternalCapabilities {
123104
}
124105
}
125106

126-
func (extCaps *ExternalCapabilities) AddStream(streamID string, pipeline string, params interface{}, streamReq []byte) (*StreamInfo, error) {
107+
func (extCaps *ExternalCapabilities) AddStream(streamID string, capability string, streamReq []byte) (*StreamInfo, error) {
127108
extCaps.capm.Lock()
128109
defer extCaps.capm.Unlock()
129110
_, ok := extCaps.Streams[streamID]
@@ -135,8 +116,7 @@ func (extCaps *ExternalCapabilities) AddStream(streamID string, pipeline string,
135116
ctx, cancel := context.WithCancel(context.Background())
136117
stream := StreamInfo{
137118
StreamID: streamID,
138-
Capability: pipeline,
139-
Params: params, // Store the interface value directly, not a pointer to it
119+
Capability: capability,
140120
StreamRequest: streamReq,
141121
StreamCtx: ctx,
142122
CancelStream: cancel,
@@ -147,15 +127,6 @@ func (extCaps *ExternalCapabilities) AddStream(streamID string, pipeline string,
147127
go func() {
148128
<-ctx.Done()
149129

150-
//gateway channels shutdown
151-
if stream.DataWriter != nil {
152-
stream.DataWriter.Close()
153-
}
154-
if stream.ControlPub != nil {
155-
stream.StopControl()
156-
stream.ControlPub.Close()
157-
}
158-
159130
//orchestrator channels shutdown
160131
if stream.pubChannel != nil {
161132
stream.pubChannel.Close()

server/job_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,7 @@ func (h *lphttp) StartStream(w http.ResponseWriter, r *http.Request) {
10891089
clog.V(common.SHORT).Infof(ctx, "stream start processed successfully took=%v balance=%v", time.Since(start), getPaymentBalance(orch, orchJob.Sender, orchJob.Req.Capability).FloatString(0))
10901090

10911091
//setup the stream
1092-
stream, err := h.node.ExternalCapabilities.AddStream(orchJob.Req.ID, orchJob.Req.Capability, orchJob.Req, respBody)
1092+
stream, err := h.node.ExternalCapabilities.AddStream(orchJob.Req.ID, orchJob.Req.Capability, respBody)
10931093
if err != nil {
10941094
clog.Errorf(ctx, "Error adding stream to external capabilities: %v", err)
10951095
respondWithError(w, "Error adding stream to external capabilities", http.StatusInternalServerError)

0 commit comments

Comments
 (0)