Skip to content

Commit 34e53b9

Browse files
slim down StreamInfo in external capabilities used by Orchestrator
1 parent 392416f commit 34e53b9

File tree

2 files changed

+6
-37
lines changed

2 files changed

+6
-37
lines changed

core/external_capabilities.go

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
ethcommon "github.com/ethereum/go-ethereum/common"
1212
"github.com/golang/glog"
13-
"github.com/livepeer/go-livepeer/media"
1413
"github.com/livepeer/go-livepeer/net"
1514
"github.com/livepeer/go-livepeer/trickle"
1615
)
@@ -47,29 +46,16 @@ type ExternalCapability struct {
4746
type StreamInfo struct {
4847
StreamID string
4948
Capability string
50-
//Gateway fields
51-
StreamRequest []byte
52-
ExcludeOrchs []string
53-
OrchToken *JobToken
54-
OrchUrl string
55-
OrchPublishUrl string
56-
OrchSubscribeUrl string
57-
OrchControlUrl string
58-
OrchEventsUrl string
59-
OrchDataUrl string
60-
ControlPub *trickle.TricklePublisher
61-
StopControl func()
6249

6350
//Orchestrator fields
6451
Sender ethcommon.Address
52+
StreamRequest []byte
6553
pubChannel *trickle.TrickleLocalPublisher
6654
subChannel *trickle.TrickleLocalPublisher
6755
controlChannel *trickle.TrickleLocalPublisher
6856
eventsChannel *trickle.TrickleLocalPublisher
6957
dataChannel *trickle.TrickleLocalPublisher
7058
//Stream fields
71-
Params interface{}
72-
DataWriter *media.SegmentWriter
7359
JobParams string
7460
StreamCtx context.Context
7561
CancelStream context.CancelFunc
@@ -82,19 +68,13 @@ func (sd *StreamInfo) IsActive() bool {
8268
return false
8369
}
8470

85-
if sd.controlChannel == nil && sd.ControlPub == nil {
71+
if sd.controlChannel == nil {
8672
return false
8773
}
8874

8975
return true
9076
}
9177

92-
func (sd *StreamInfo) ExcludeOrch(orchUrl string) {
93-
sd.sdm.Lock()
94-
defer sd.sdm.Unlock()
95-
sd.ExcludeOrchs = append(sd.ExcludeOrchs, orchUrl)
96-
}
97-
9878
func (sd *StreamInfo) UpdateParams(params string) {
9979
sd.sdm.Lock()
10080
defer sd.sdm.Unlock()
@@ -123,7 +103,7 @@ func NewExternalCapabilities() *ExternalCapabilities {
123103
}
124104
}
125105

126-
func (extCaps *ExternalCapabilities) AddStream(streamID string, pipeline string, params interface{}, streamReq []byte) (*StreamInfo, error) {
106+
func (extCaps *ExternalCapabilities) AddStream(streamID string, capability string, streamReq []byte) (*StreamInfo, error) {
127107
extCaps.capm.Lock()
128108
defer extCaps.capm.Unlock()
129109
_, ok := extCaps.Streams[streamID]
@@ -135,8 +115,7 @@ func (extCaps *ExternalCapabilities) AddStream(streamID string, pipeline string,
135115
ctx, cancel := context.WithCancel(context.Background())
136116
stream := StreamInfo{
137117
StreamID: streamID,
138-
Capability: pipeline,
139-
Params: params, // Store the interface value directly, not a pointer to it
118+
Capability: capability,
140119
StreamRequest: streamReq,
141120
StreamCtx: ctx,
142121
CancelStream: cancel,
@@ -147,15 +126,6 @@ func (extCaps *ExternalCapabilities) AddStream(streamID string, pipeline string,
147126
go func() {
148127
<-ctx.Done()
149128

150-
//gateway channels shutdown
151-
if stream.DataWriter != nil {
152-
stream.DataWriter.Close()
153-
}
154-
if stream.ControlPub != nil {
155-
stream.StopControl()
156-
stream.ControlPub.Close()
157-
}
158-
159129
//orchestrator channels shutdown
160130
if stream.pubChannel != nil {
161131
stream.pubChannel.Close()

server/job_stream.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package server
33
import (
44
"bytes"
55
"context"
6-
"encoding/base64"
76
"encoding/json"
87
"errors"
98
"fmt"
@@ -1042,7 +1041,7 @@ func (h *lphttp) StartStream(w http.ResponseWriter, r *http.Request) {
10421041
w.Header().Set("X-Data-Url", dataUrl)
10431042
}
10441043

1045-
reqBodyForRunner["request"] = base64.StdEncoding.EncodeToString(body)
1044+
reqBodyForRunner["request"] = string(body)
10461045
reqBodyBytes, err := json.Marshal(reqBodyForRunner)
10471046
if err != nil {
10481047
clog.Errorf(ctx, "Failed to marshal request body err=%v", err)
@@ -1089,7 +1088,7 @@ func (h *lphttp) StartStream(w http.ResponseWriter, r *http.Request) {
10891088
clog.V(common.SHORT).Infof(ctx, "stream start processed successfully took=%v balance=%v", time.Since(start), getPaymentBalance(orch, orchJob.Sender, orchJob.Req.Capability).FloatString(0))
10901089

10911090
//setup the stream
1092-
stream, err := h.node.ExternalCapabilities.AddStream(orchJob.Req.ID, orchJob.Req.Capability, orchJob.Req, respBody)
1091+
stream, err := h.node.ExternalCapabilities.AddStream(orchJob.Req.ID, orchJob.Req.Capability, reqBodyBytes)
10931092
if err != nil {
10941093
clog.Errorf(ctx, "Error adding stream to external capabilities: %v", err)
10951094
respondWithError(w, "Error adding stream to external capabilities", http.StatusInternalServerError)

0 commit comments

Comments
 (0)