@@ -59,7 +59,7 @@ func TestQueue_Publish(t *testing.T) {
59
59
dbMock .EXPECT ().InsertOne (taskExpected ).Return (oId , tt .error )
60
60
61
61
opts := NewPublishOptions ().SetMaxTries (tt .maxTries )
62
- task , err := q .Publish (tt .topic , tt .payload , opts )
62
+ task , err := q .Publish (tt .topic , tt .payload , opts , nil )
63
63
64
64
if tt .error == nil {
65
65
taskExpected .Id = oId
@@ -325,6 +325,84 @@ func TestQueue_SubscribeUnprocessedTasks(t *testing.T) {
325
325
}
326
326
}
327
327
328
+ func TestQueue_GetNextById (t * testing.T ) {
329
+ setNowFunc (func () time.Time {
330
+ t , _ := time .Parse (time .DateTime , "2024-10-12 15:04:05" )
331
+ return t
332
+ })
333
+
334
+ now := nowFunc ()
335
+
336
+ tests := []struct {
337
+ name string
338
+ task Task
339
+ error error
340
+ }{
341
+ {
342
+ name : "Success" ,
343
+ task : Task {
344
+ Id : primitive .NewObjectID (),
345
+ Topic : "topic1" ,
346
+ Payload : "payload1" ,
347
+ Tries : 1 ,
348
+ MaxTries : 3 ,
349
+ Meta : Meta {
350
+ Created : now ,
351
+ },
352
+ State : StateRunning ,
353
+ },
354
+ },
355
+ {
356
+ name : "Error" ,
357
+ task : Task {
358
+ Id : primitive .NewObjectID (),
359
+ },
360
+ error : errors .New ("no doc found" ),
361
+ },
362
+ }
363
+ for _ , tt := range tests {
364
+ t .Run (tt .name , func (t * testing.T ) {
365
+ dbMock := NewDbInterfaceMock (t )
366
+ q := NewQueue (dbMock )
367
+
368
+ resOk := mongo .NewSingleResultFromDocument (tt .task , tt .error , nil )
369
+ resNoDoc := mongo .NewSingleResultFromDocument (tt .task , mongo .ErrNoDocuments , nil )
370
+
371
+ var res * mongo.SingleResult
372
+ if tt .error == nil {
373
+ res = resOk
374
+ } else {
375
+ res = resNoDoc
376
+ }
377
+
378
+ filter := bson.M {
379
+ "_id" : tt .task .Id ,
380
+ "state" : StatePending ,
381
+ "$expr" : bson.M {"$lt" : bson.A {"$tries" , "$maxtries" }},
382
+ }
383
+
384
+ update := bson.M {
385
+ "$set" : bson.M {"state" : StateRunning , "meta.dispatched" : now },
386
+ "$inc" : bson.M {"tries" : 1 },
387
+ }
388
+
389
+ opts := options .FindOneAndUpdate ().SetReturnDocument (options .After )
390
+ dbMock .EXPECT ().FindOneAndUpdate (filter , update , opts ).Once ().Return (res )
391
+
392
+ ts , err := q .GetNextById (tt .task .Id )
393
+
394
+ if tt .error == nil {
395
+ assert .Equal (t , tt .task .Topic , ts .Topic )
396
+ assert .Equal (t , tt .error , err )
397
+ } else {
398
+ assert .Nil (t , ts )
399
+ assert .Nil (t , err )
400
+ }
401
+
402
+ })
403
+ }
404
+ }
405
+
328
406
func TestQueue_Ack (t * testing.T ) {
329
407
setNowFunc (func () time.Time {
330
408
t , _ := time .Parse (time .DateTime , "2024-10-12 15:04:05" )
@@ -506,7 +584,6 @@ func TestQueue_Selftest(t *testing.T) {
506
584
}
507
585
508
586
func TestQueue_CreateIndexes (t * testing.T ) {
509
-
510
587
tests := []struct {
511
588
name string
512
589
error error
@@ -537,3 +614,61 @@ func TestQueue_CreateIndexes(t *testing.T) {
537
614
})
538
615
}
539
616
}
617
+
618
+ func TestQueue_Reschedule (t * testing.T ) {
619
+ setNowFunc (func () time.Time {
620
+ t , _ := time .Parse (time .DateTime , "2023-11-12 15:04:05" )
621
+ return t
622
+ })
623
+
624
+ tests := []struct {
625
+ name string
626
+ task Task
627
+ error error
628
+ }{
629
+ {
630
+ name : "Success" ,
631
+ task : Task {
632
+ Topic : "foo.bar" ,
633
+ Payload : "whatever" ,
634
+ Tries : 1 ,
635
+ MaxTries : 3 ,
636
+ Meta : Meta {
637
+ Created : nowFunc (),
638
+ },
639
+ State : StatePending ,
640
+ },
641
+ },
642
+ }
643
+ for _ , tt := range tests {
644
+ t .Run (tt .name , func (t * testing.T ) {
645
+ dbMock := NewDbInterfaceMock (t )
646
+ q := NewQueue (dbMock )
647
+
648
+ oId := primitive .NewObjectID ()
649
+
650
+ taskExpected := Task {
651
+ Topic : tt .task .Topic ,
652
+ Payload : tt .task .Payload ,
653
+ Tries : 1 ,
654
+ MaxTries : 3 ,
655
+ Meta : Meta {
656
+ Created : nowFunc (),
657
+ },
658
+ State : StatePending ,
659
+ }
660
+ dbMock .EXPECT ().InsertOne (taskExpected ).Return (oId , tt .error )
661
+
662
+ task , err := q .Reschedule (& tt .task )
663
+
664
+ if tt .error == nil {
665
+ taskExpected .Id = oId
666
+ assert .Equal (t , taskExpected , * task )
667
+ } else {
668
+ assert .Nil (t , task )
669
+ assert .Equal (t , tt .error , err )
670
+ }
671
+
672
+ })
673
+ }
674
+ }
0 commit comments