Skip to content

Commit d73fe37

Browse files
error handling, logging, lock stream info when checking active
1 parent 9890306 commit d73fe37

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

core/ai_orchestrator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1203,7 +1203,7 @@ func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability s
12031203
//ensure price numerator and denominator can be int64
12041204
jobPrice, err = common.PriceToInt64(jobPrice)
12051205
if err != nil {
1206-
return nil, err
1206+
return nil, fmt.Errorf("invalid job price: %w", err)
12071207
}
12081208

12091209
return &net.PriceInfo{

core/external_capabilities.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type StreamInfo struct {
6464
}
6565

6666
func (sd *StreamInfo) IsActive() bool {
67+
sd.sdm.Lock()
68+
defer sd.sdm.Unlock()
6769
if sd.StreamCtx.Err() != nil {
6870
return false
6971
}
@@ -128,19 +130,29 @@ func (extCaps *ExternalCapabilities) AddStream(streamID string, capability strin
128130

129131
//orchestrator channels shutdown
130132
if stream.pubChannel != nil {
131-
stream.pubChannel.Close()
133+
if err := stream.pubChannel.Close(); err != nil {
134+
glog.Errorf("error closing pubChannel for stream=%s: %v", streamID, err)
135+
}
132136
}
133137
if stream.subChannel != nil {
134-
stream.subChannel.Close()
138+
if err := stream.subChannel.Close(); err != nil {
139+
glog.Errorf("error closing subChannel for stream=%s: %v", streamID, err)
140+
}
135141
}
136142
if stream.controlChannel != nil {
137-
stream.controlChannel.Close()
143+
if err := stream.controlChannel.Close(); err != nil {
144+
glog.Errorf("error closing controlChannel for stream=%s: %v", streamID, err)
145+
}
138146
}
139147
if stream.eventsChannel != nil {
140-
stream.eventsChannel.Close()
148+
if err := stream.eventsChannel.Close(); err != nil {
149+
glog.Errorf("error closing eventsChannel for stream=%s: %v", streamID, err)
150+
}
141151
}
142152
if stream.dataChannel != nil {
143-
stream.dataChannel.Close()
153+
if err := stream.dataChannel.Close(); err != nil {
154+
glog.Errorf("error closing dataChannel for stream=%s: %v", streamID, err)
155+
}
144156
}
145157
return
146158
}()

server/job_stream.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ func (ls *LivepeerServer) StopStream() http.Handler {
100100
stopJob.sign() //no changes to make, sign job
101101

102102
token, err := sessionToToken(params.liveParams.sess)
103+
if err != nil {
104+
clog.Errorf(ctx, "Error converting session to token: %s", err)
105+
http.Error(w, err.Error(), http.StatusBadRequest)
106+
return
107+
}
103108
newToken, err := getToken(ctx, 3*time.Second, token.ServiceAddr, stopJob.Job.Req.Capability, stopJob.Job.Req.Sender, stopJob.Job.Req.Sig)
104109
if err != nil {
105110
clog.Errorf(ctx, "Error converting session to token: %s", err)

0 commit comments

Comments
 (0)