Skip to content

Commit 225e586

Browse files
committed
test subscribe
1 parent 410a251 commit 225e586

File tree

6 files changed

+458
-24
lines changed

6 files changed

+458
-24
lines changed

.mockery.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ packages:
1010
github.com/mbretter/go-mongodb-queue:
1111
interfaces:
1212
DbInterface:
13+
ChangeStreamInterface:

changestreaminterface_test.go

Lines changed: 174 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

db.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,17 @@ type DbInterface interface {
1313
FindOneAndUpdate(filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
1414
UpdateOne(filter interface{}, update interface{}) error
1515
UpdateMany(filter interface{}, update interface{}) error
16-
Watch(pipeline interface{}) (*mongo.ChangeStream, error)
16+
Watch(pipeline interface{}) (ChangeStreamInterface, error)
1717
CreateIndexes(index []mongo.IndexModel) error
1818
Context() context.Context
1919
}
2020

21+
type ChangeStreamInterface interface {
22+
Next(ctx context.Context) bool
23+
Decode(v interface{}) error
24+
Close(ctx context.Context) error
25+
}
26+
2127
type StdDb struct {
2228
context context.Context
2329
collection *mongo.Collection
@@ -64,7 +70,7 @@ func (d *StdDb) UpdateMany(filter interface{}, update interface{}) error {
6470
return err
6571
}
6672

67-
func (d *StdDb) Watch(pipeline interface{}) (*mongo.ChangeStream, error) {
73+
func (d *StdDb) Watch(pipeline interface{}) (ChangeStreamInterface, error) {
6874
return d.collection.Watch(d.context, pipeline)
6975
}
7076

dbinterface_test.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

queue.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type Task struct {
4242
Meta Meta
4343
}
4444

45+
type event struct {
46+
Task Task `bson:"fullDocument"`
47+
}
48+
4549
func NewQueue(db DbInterface) *Queue {
4650
queue := Queue{
4751
db: db,
@@ -92,7 +96,7 @@ func (q *Queue) GetNext(topic string) (*Task, error) {
9296
"$expr": bson.M{"$lt": bson.A{"$tries", "$maxtries"}},
9397
},
9498
bson.M{
95-
"$set": bson.M{"state": StateRunning, "meta.dispatched": time.Now()},
99+
"$set": bson.M{"state": StateRunning, "meta.dispatched": nowFunc()},
96100
"$inc": bson.M{"tries": 1},
97101
},
98102
options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}),
@@ -140,36 +144,36 @@ func (q *Queue) Subscribe(topic string, cb Callback) error {
140144
}
141145

142146
for stream.Next(q.db.Context()) {
143-
var event struct {
144-
Task Task `bson:"fullDocument"`
145-
}
147+
var evt event
146148

147-
if err := stream.Decode(&event); err != nil {
149+
if err := stream.Decode(&evt); err != nil {
148150
continue
149151
}
150152

153+
task := evt.Task
154+
151155
// already processed
152-
if event.Task.Meta.Created.Before(processedUntil) {
156+
if task.Meta.Created.Before(processedUntil) {
153157
continue
154158
}
155159

156-
event.Task.State = StateRunning
157-
now := time.Now()
158-
event.Task.Meta.Dispatched = &now
160+
task.State = StateRunning
161+
now := nowFunc()
162+
task.Meta.Dispatched = &now
159163

160164
err := q.db.UpdateOne(
161-
bson.M{"_id": event.Task.Id},
165+
bson.M{"_id": task.Id},
162166
bson.M{"$set": bson.M{
163-
"state": event.Task.State,
164-
"meta.dispatched": event.Task.Meta.Dispatched,
167+
"state": task.State,
168+
"meta.dispatched": task.Meta.Dispatched,
165169
}})
166170

167171
if err != nil {
168-
_ = q.Err(event.Task.Id.Hex(), err)
172+
_ = q.Err(task.Id.Hex(), err)
169173
continue
170174
}
171175

172-
cb(event.Task)
176+
cb(task)
173177
}
174178

175179
return nil
@@ -199,7 +203,7 @@ func (q *Queue) Err(id string, err error) error {
199203
bson.M{"_id": oId},
200204
bson.M{"$set": bson.M{
201205
"state": StateError,
202-
"meta.completed": time.Now(),
206+
"meta.completed": nowFunc(),
203207
"message": err.Error()},
204208
})
205209
}

0 commit comments

Comments
 (0)