Skip to content

Commit 2ff21f6

Browse files
committed
resolve race-condition when using Subscribe
1 parent 11dedce commit 2ff21f6

File tree

6 files changed

+133
-47
lines changed

6 files changed

+133
-47
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,7 @@ start.txt
1919
# Output of the go coverage tool, specifically when used with LiteIDE
2020
*.out
2121

22+
cmd/demo/queue
23+
2224
# Dependency directories (remove the comment below to include it)
2325
# vendor/

README.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ polling too.
1313
The motivation was to build an easy-to-integrate queuing system without sophisticated features, without external
1414
dependencies, and with direct integration into your application.
1515

16+
:warning: There is no stable release yet, API is changing
17+
1618
## Install
1719

1820
```
@@ -34,11 +36,9 @@ Along the task, any arbitrary data can be stored.
3436

3537
Each task belongs to a topic, when publishing to a topic, the handler of this topic gets the first unprocessed task.
3638

37-
When using the event based `Subscribe` function, only one handler/consumer for one topic should be used at the same time,
38-
for avoiding race conditions.
39+
You can use either the event based `Subscribe` function, or the `GetNext` function which is needed for polling.
40+
It both cases it is totally safe to have multiple consumers running on the same topic.
3941

40-
If you need to run multiple handlers/consumers on the same topic, you have to use the `GetNext` function, which supports
41-
polling only.
4242

4343
```go
4444
ctx := context.TODO()
@@ -77,16 +77,17 @@ payload := Payload{
7777
Num: 73,
7878
}
7979

80-
task, err := qu.Publish("some.topic", &payload, queue.DefaultMaxTries)
80+
task, err := qu.Publish("some.topic", &payload)
8181
if err != nil {
8282
log.Fatal(err)
8383
}
8484
```
8585

8686
## Subscribe
8787

88-
Any handler/application can subscribe to a certain topic. However, there can only be one handler for a certain topic
89-
otherwise, you will run into race conditions.
88+
Any handler/application can subscribe to a certain topic, the callback function receives a copy of the task.
89+
90+
After processing the task you have to `Ack` it, or mark it as `Err`.
9091

9192
Here is a small snippet which demonstrates the usage of subscribe using goroutines.
9293
```go
@@ -153,12 +154,23 @@ for {
153154
}
154155
```
155156

157+
## Reschedule
158+
159+
If a task had an error, and you want to process this task again, you can use `Reschedule`.
160+
161+
```go
162+
newTask, err := Reschedule(task)
163+
```
164+
165+
When rescheduling, the original task remains untouched, there will create a new task with the same payload and the
166+
initial tries value will be increased.
167+
156168
## Selfcare
157169

158170
The selfcare function re-schedules long-running tasks, this might happen, if the application could not acknowledge
159171
the task, and it sets the task to the error state, if the maximum number of tries have been exceeded.
160172

161-
The selfcare function might be run per topic, if no topic was given, the selfcare runs over all tasks.
173+
The selfcare function might be run per topic, if no topic was given, the selfcare runs over all topics.
162174
As second argument, the timeout for long-running tasks can be given, if no timeout was given it defaults to 5 minutes.
163175

164176
## CreateIndexes

cmd/demo/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.PHONY: build
2+
3+
build:
4+
CGO_ENABLED=0 go build -v -o queue main.go

cmd/demo/main.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"go.mongodb.org/mongo-driver/mongo"
99
"go.mongodb.org/mongo-driver/mongo/options"
1010
"log"
11+
"os"
1112
"sync"
1213
)
1314

@@ -18,8 +19,6 @@ type Payload struct {
1819
}
1920

2021
func main() {
21-
var mongodbUri = flag.String("u", "", "mongodb url")
22-
var dbName = flag.String("d", "", "mongodb database name")
2322
var collName = flag.String("c", "queue", "mongodb collection name")
2423
var publish = flag.String("p", "", "publish topic")
2524
var getnext = flag.String("g", "", "next topic")
@@ -29,23 +28,25 @@ func main() {
2928
var subscribe = flag.String("s", "", "subscribe on topic")
3029
flag.Parse()
3130

32-
if len(*mongodbUri) == 0 {
31+
mongodbUri := os.Getenv("MONGODB_URI")
32+
dbName := os.Getenv("MONGODB_DB")
33+
if len(mongodbUri) == 0 {
3334
log.Fatal("mongodb uri missing")
3435
}
3536

36-
if len(*dbName) == 0 {
37+
if len(dbName) == 0 {
3738
log.Fatal("mongodb database name missing")
3839
}
3940

4041
ctx := context.TODO()
41-
client, err := mongo.Connect(ctx, options.Client().ApplyURI(*mongodbUri))
42+
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongodbUri))
4243
if err != nil {
4344
log.Fatal(err)
4445
}
4546
//goland:noinspection ALL
4647
defer client.Disconnect(ctx)
4748

48-
collection := client.Database(*dbName).Collection(*collName)
49+
collection := client.Database(dbName).Collection(*collName)
4950

5051
queueDb := queue.NewStdDb(collection, ctx)
5152
qu := queue.NewQueue(queueDb)
@@ -80,7 +81,8 @@ func main() {
8081
}
8182

8283
if *publish != "" {
83-
task, err := qu.Publish(*publish, &payload, queue.DefaultMaxTries)
84+
opts := queue.NewPublishOptions().SetMaxTries(1)
85+
task, err := qu.Publish(*publish, &payload, opts)
8486
if err != nil {
8587
log.Fatal(err)
8688
}

queue.go

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,55 @@ func setNowFunc(n func() time.Time) {
6161
nowFunc = n
6262
}
6363

64+
type PublishOptions struct {
65+
MaxTries uint
66+
Tries int
67+
}
68+
69+
func NewPublishOptions() *PublishOptions {
70+
return &PublishOptions{
71+
MaxTries: 0,
72+
Tries: -1,
73+
}
74+
}
75+
76+
func (p *PublishOptions) SetMaxTries(maxTries uint) *PublishOptions {
77+
p.MaxTries = maxTries
78+
return p
79+
}
80+
81+
func (p *PublishOptions) setTries(tries uint) *PublishOptions {
82+
p.Tries = int(tries)
83+
return p
84+
}
85+
6486
// Publish inserts a new task into the queue with the given topic, payload, and maxTries.
6587
// If maxTries is zero, it defaults to DefaultMaxTries.
66-
func (q *Queue) Publish(topic string, payload any, maxTries uint) (*Task, error) {
67-
if maxTries == 0 {
68-
maxTries = DefaultMaxTries
88+
func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Task, error) {
89+
90+
o := PublishOptions{
91+
MaxTries: DefaultMaxTries,
92+
Tries: 0,
93+
}
94+
95+
for _, opt := range opts {
96+
if opt == nil {
97+
continue
98+
}
99+
if opt.MaxTries > 0 {
100+
o.MaxTries = opt.MaxTries
101+
}
102+
103+
if opt.Tries >= 0 {
104+
o.Tries = opt.Tries
105+
}
69106
}
70107

71108
t := Task{
72109
Topic: topic,
73110
Payload: payload,
74-
Tries: 0,
75-
MaxTries: maxTries,
111+
Tries: uint(o.Tries),
112+
MaxTries: o.MaxTries,
76113
Meta: Meta{
77114
Created: nowFunc(),
78115
Dispatched: nil,
@@ -102,7 +139,7 @@ func (q *Queue) GetNext(topic string) (*Task, error) {
102139
"$set": bson.M{"state": StateRunning, "meta.dispatched": nowFunc()},
103140
"$inc": bson.M{"tries": 1},
104141
},
105-
options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}),
142+
options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}).SetReturnDocument(options.After),
106143
)
107144

108145
if errors.Is(res.Err(), mongo.ErrNoDocuments) {
@@ -116,11 +153,42 @@ func (q *Queue) GetNext(topic string) (*Task, error) {
116153
return &t, nil
117154
}
118155

156+
func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error) {
157+
t := Task{}
158+
res := q.db.FindOneAndUpdate(bson.M{
159+
"_id": id,
160+
"state": StatePending,
161+
"$expr": bson.M{"$lt": bson.A{"$tries", "$maxtries"}},
162+
},
163+
bson.M{
164+
"$set": bson.M{"state": StateRunning, "meta.dispatched": nowFunc()},
165+
"$inc": bson.M{"tries": 1},
166+
},
167+
options.FindOneAndUpdate().SetReturnDocument(options.After),
168+
)
169+
170+
if errors.Is(res.Err(), mongo.ErrNoDocuments) {
171+
return nil, nil
172+
}
173+
174+
if err := res.Decode(&t); err != nil {
175+
return nil, err
176+
}
177+
178+
return &t, nil
179+
}
180+
181+
func (q *Queue) Reschedule(task *Task) (*Task, error) {
182+
return q.Publish(task.Topic, task.Payload, NewPublishOptions().setTries(task.Tries).SetMaxTries(task.MaxTries))
183+
}
184+
119185
type Callback func(t Task)
120186

121187
func (q *Queue) Subscribe(topic string, cb Callback) error {
122-
pipeline := bson.D{
123-
{"$match", bson.D{{"operationType", "insert"}, {"fullDocument.topic", topic}, {"fullDocument.state", StatePending}}},
188+
pipeline := bson.D{{"$match", bson.D{
189+
{"operationType", "insert"},
190+
{"fullDocument.topic", topic},
191+
{"fullDocument.state", StatePending}}},
124192
}
125193

126194
stream, err := q.db.Watch(mongo.Pipeline{pipeline})
@@ -153,30 +221,20 @@ func (q *Queue) Subscribe(topic string, cb Callback) error {
153221
continue
154222
}
155223

156-
task := evt.Task
157-
158224
// already processed
159-
if task.Meta.Created.Before(processedUntil) {
225+
if evt.Task.Meta.Created.Before(processedUntil) {
160226
continue
161227
}
162228

163-
task.State = StateRunning
164-
now := nowFunc()
165-
task.Meta.Dispatched = &now
166-
167-
err := q.db.UpdateOne(
168-
bson.M{"_id": task.Id},
169-
bson.M{"$set": bson.M{
170-
"state": task.State,
171-
"meta.dispatched": task.Meta.Dispatched,
172-
}})
173-
229+
task, err := q.GetNextById(evt.Task.Id)
174230
if err != nil {
175-
_ = q.Err(task.Id.Hex(), err)
231+
_ = q.Err(evt.Task.Id.Hex(), err)
176232
continue
177233
}
178234

179-
cb(task)
235+
if task != nil {
236+
cb(*task)
237+
}
180238
}
181239

182240
return nil

queue_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func TestQueue_Publish(t *testing.T) {
5858
}
5959
dbMock.EXPECT().InsertOne(taskExpected).Return(oId, tt.error)
6060

61-
task, err := q.Publish(tt.topic, tt.payload, tt.maxTries)
61+
opts := NewPublishOptions().SetMaxTries(tt.maxTries)
62+
task, err := q.Publish(tt.topic, tt.payload, opts)
6263

6364
if tt.error == nil {
6465
taskExpected.Id = oId
@@ -183,7 +184,7 @@ func TestQueue_Subscribe(t *testing.T) {
183184
}, bson.M{
184185
"$set": bson.M{"state": StateRunning, "meta.dispatched": now},
185186
"$inc": bson.M{"tries": 1},
186-
}, options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}})).Return(res)
187+
}, options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}).SetReturnDocument(options.After)).Return(res)
187188

188189
if tt.task != nil {
189190
changeStream.EXPECT().Next(context.TODO()).Once().Return(true)
@@ -202,11 +203,18 @@ func TestQueue_Subscribe(t *testing.T) {
202203
goto runTest
203204
}
204205

205-
dbMock.EXPECT().UpdateOne(bson.M{"_id": tt.task.Id},
206-
bson.M{"$set": bson.M{
207-
"state": StateRunning,
208-
"meta.dispatched": &now,
209-
}}).Return(tt.updateError)
206+
retTask := *tt.task
207+
retTask.State = StateRunning
208+
res = mongo.NewSingleResultFromDocument(retTask, tt.updateError, nil)
209+
210+
dbMock.EXPECT().FindOneAndUpdate(bson.M{
211+
"_id": tt.task.Id,
212+
"state": StatePending,
213+
"$expr": bson.M{"$lt": bson.A{"$tries", "$maxtries"}},
214+
}, bson.M{
215+
"$set": bson.M{"state": StateRunning, "meta.dispatched": now},
216+
"$inc": bson.M{"tries": 1},
217+
}, options.FindOneAndUpdate().SetReturnDocument(options.After)).Return(res)
210218

211219
if tt.updateError != nil {
212220
dbMock.EXPECT().UpdateOne(
@@ -300,7 +308,7 @@ func TestQueue_SubscribeUnprocessedTasks(t *testing.T) {
300308
"$inc": bson.M{"tries": 1},
301309
}
302310

303-
opts := options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}})
311+
opts := options.FindOneAndUpdate().SetSort(bson.D{{"meta.scheduled", 1}}).SetReturnDocument(options.After)
304312
dbMock.EXPECT().FindOneAndUpdate(filter, update, opts).Once().Return(res)
305313

306314
if tt.error == nil {

0 commit comments

Comments
 (0)