Skip to content

Commit 1ca67c4

Browse files
tests tests tests
1 parent 6fb5308 commit 1ca67c4

File tree

6 files changed

+1924
-80
lines changed

6 files changed

+1924
-80
lines changed

common/testutil.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,23 @@ func IgnoreRoutines() []goleak.Option {
9090
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch",
9191
"github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1", "github.com/patrickmn/go-cache.(*janitor).Run",
9292
"github.com/golang/glog.(*fileSink).flushDaemon", "github.com/livepeer/go-livepeer/core.(*LivepeerNode).transcodeFrames.func2", "github.com/ipfs/go-log/writer.(*MirrorWriter).logRoutine",
93-
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup",
93+
"github.com/livepeer/go-livepeer/core.(*Balances).StartCleanup", "github.com/livepeer/go-livepeer/server.startTrickleSubscribe.func2", "github.com/livepeer/go-livepeer/server.startTrickleSubscribe",
94+
"net/http.(*persistConn).writeLoop", "net/http.(*persistConn).readLoop", "io.(*pipe).read",
95+
"github.com/livepeer/go-livepeer/media.gatherIncomingTracks",
9496
}
9597

96-
res := make([]goleak.Option, 0, len(funcs2ignore))
98+
// Functions that might have other functions on top of their stack (like time.Sleep)
99+
// These need to be ignored with IgnoreAnyFunction instead of IgnoreTopFunction
100+
funcsAnyIgnore := []string{
101+
"github.com/livepeer/go-livepeer/server.ffmpegOutput",
102+
}
103+
104+
res := make([]goleak.Option, 0, len(funcs2ignore)+len(funcsAnyIgnore))
97105
for _, f := range funcs2ignore {
98106
res = append(res, goleak.IgnoreTopFunction(f))
99107
}
108+
for _, f := range funcsAnyIgnore {
109+
res = append(res, goleak.IgnoreAnyFunction(f))
110+
}
100111
return res
101112
}

core/livepeernode.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,23 @@ func (n *LivepeerNode) NewLivePipeline(requestID, streamID, pipeline string, str
195195
n.LiveMu.Lock()
196196
defer n.LiveMu.Unlock()
197197
n.LivePipelines[streamID] = &LivePipeline{
198-
RequestID: requestID,
199-
Pipeline: pipeline,
200-
StreamCtx: streamCtx,
201-
streamParams: streamParams,
202-
streamCancel: streamCancel,
198+
RequestID: requestID,
199+
StreamID: streamID,
200+
Pipeline: pipeline,
201+
StreamCtx: streamCtx,
202+
streamParams: streamParams,
203+
streamCancel: streamCancel,
204+
streamRequest: streamRequest,
203205
}
204206
return n.LivePipelines[streamID]
205207
}
206208

209+
func (n *LivepeerNode) RemoveLivePipeline(streamID string) {
210+
n.LiveMu.Lock()
211+
defer n.LiveMu.Unlock()
212+
delete(n.LivePipelines, streamID)
213+
}
214+
207215
func (p *LivePipeline) StreamParams() interface{} {
208216
return p.streamParams
209217
}
@@ -217,7 +225,10 @@ func (p *LivePipeline) StreamRequest() []byte {
217225
}
218226

219227
func (p *LivePipeline) StopStream(err error) {
220-
p.StopControl()
228+
if p.StopControl != nil {
229+
p.StopControl()
230+
}
231+
221232
p.streamCancel(err)
222233
}
223234

server/job_rpc.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,10 +864,22 @@ func processPayment(ctx context.Context, orch Orchestrator, sender ethcommon.Add
864864
}
865865

866866
func createPayment(ctx context.Context, jobReq *JobRequest, orchToken *core.JobToken, node *core.LivepeerNode) (string, error) {
867+
if orchToken == nil {
868+
return "", errors.New("orchestrator token is nil, cannot create payment")
869+
}
870+
//if no sender or ticket params, no payment
871+
if node.Sender == nil {
872+
return "", errors.New("no ticket sender available, cannot create payment")
873+
}
874+
if orchToken.TicketParams == nil {
875+
return "", errors.New("no ticket params available, cannot create payment")
876+
}
877+
867878
var payment *net.Payment
868879
createTickets := true
869880
clog.Infof(ctx, "creating payment for job request %s", jobReq.Capability)
870881
sender := ethcommon.HexToAddress(jobReq.Sender)
882+
871883
orchAddr := ethcommon.BytesToAddress(orchToken.TicketParams.Recipient)
872884
sessionID := node.Sender.StartSession(*pmTicketParams(orchToken.TicketParams))
873885

0 commit comments

Comments
 (0)