Skip to content

Commit f887f91

Browse files
committed
write some docs, add timeout parameter to selfcare
1 parent 1629822 commit f887f91

File tree

4 files changed

+174
-5
lines changed

4 files changed

+174
-5
lines changed

README.md

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,166 @@
33
[![codecov](https://codecov.io/gh/mbretter/go-mongodb-queue/graph/badge.svg?token=YMBMKY7W9X)](https://codecov.io/gh/mbretter/go-mongodb-queue)
44
[![GoDoc](https://godoc.org/github.com/mbretter/go-mongodb-queue?status.svg)](https://pkg.go.dev/github.com/mbretter/go-mongodb-queue)
55

6-
Dead simple queuing base upon mongodb
6+
This is a dead simple queuing system based on MongoDB.
7+
It is primarily build upon MongoDB's [change streams](https://www.mongodb.com/docs/manual/changeStreams/), this provides
8+
the possibility to use an event based system, instead of using a polling approach.
79

10+
MongoDB change-streams are available, if you have configured a replica-set, as a fallback this packages supports
11+
polling too.
12+
13+
The motivation was to build an easy-to-integrate queuing system without sophisticated features, without external
14+
dependencies, and with direct integration into your application.
15+
16+
## Install
17+
18+
```
19+
go get mbretter/go-mongodb-queue
20+
```
21+
22+
import
23+
24+
```go
25+
import queue "github.com/mbretter/go-mongodb-queue"
26+
```
27+
28+
## Features
29+
30+
There are not that many, it supports retries until a maximum number of tries have been reached, and it has a
31+
default timeout for tasks, which is set to 5 minutes, if running the selfcare function.
32+
33+
Along the task, any arbitrary data can be stored.
34+
35+
Each task belongs to a topic, when publishing to a topic, the handler of this topic gets the first unprocessed task.
36+
37+
When using the event based `Subscribe` function, only one handler/consumer for one topic should be used at the same time,
38+
for avoiding race conditions.
39+
40+
If you need to run multiple handlers/consumers on the same topic, you have to use the `GetNext` function, which supports
41+
polling only.
42+
43+
```go
44+
ctx := context.TODO()
45+
// connect to the mongo database using the mongo-driver
46+
// mongodbUri contains the uri to your mongodb instance
47+
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongodbUri))
48+
if err != nil {
49+
log.Fatal(err)
50+
}
51+
defer client.Disconnect(ctx)
52+
53+
// get database and collection
54+
collection := client.Database("mydb").Collection("queue")
55+
56+
// make the queue-db
57+
queueDb := queue.NewStdDb(collection, ctx)
58+
59+
// make the queue
60+
qu := queue.NewQueue(queueDb)
61+
```
62+
63+
## Publish
64+
65+
You can publish to any topic, the topic acts like a filter for your tasks, the payload can be any arbitrary data.
66+
67+
```go
68+
type Payload struct {
69+
Name string `bson:"name"`
70+
Desc string `bson:"desc"`
71+
Num int `bson:"num"`
72+
}
73+
74+
payload := Payload{
75+
Name: "Arnold Schwarzenegger",
76+
Desc: "I'll be back",
77+
Num: 73,
78+
}
79+
80+
task, err := qu.Publish("some.topic", &payload, queue.DefaultMaxTries)
81+
if err != nil {
82+
log.Fatal(err)
83+
}
84+
```
85+
86+
## Subscribe
87+
88+
Any handler/application can subscribe to a certain topic. However, there can only be one handler for a certain topic
89+
otherwise, you will run into race conditions.
90+
91+
Here is a small snippet which demonstrates the usage of subscribe using goroutines.
92+
```go
93+
// define your worker function
94+
workerFunc := func(qu *queue.Queue, task queue.Task) {
95+
fmt.Println("worker", task)
96+
// after processing the task you have to acknowledge it
97+
_ = qu.Ack(task.Id.Hex())
98+
}
99+
100+
var wg sync.WaitGroup
101+
// subscribe and pass the worker function
102+
err := qu.Subscribe("some.topic", func(t queue.Task) {
103+
wg.Add(1)
104+
go func() {
105+
defer wg.Done()
106+
workerFunc(qu, t)
107+
}()
108+
})
109+
110+
if err != nil {
111+
log.Fatal(err)
112+
}
113+
114+
wg.Wait()
115+
```
116+
117+
On startup, the `Subscribe` functions checks for unprocessed tasks scheduled before we subscribed, because existing
118+
tasks will not be covered by the MongoDB change-stream.
119+
120+
## Ack/Err
121+
122+
After processing a task you have to acknowledge, that you have processed the task by using `Ack`.
123+
In case of an error you can use the `Err` function to mark the task as failed.
124+
125+
```go
126+
err := qu.Ack(task.Id.Hex())
127+
if err != nil {
128+
log.Fatal(err)
129+
}
130+
131+
qu.Err(task.Id.Hex(), errors.New("something went wrong"))
132+
```
133+
134+
## Polling
135+
136+
You have to loop over `GetNext`, `GetNext` returns a nil task, if no unprocessed task was found or the topic.
137+
It is safe to use `GetNext` for the same topic from different processes, there will be no race conditions, because MongoDB's atomic
138+
`FindOneAndUpdate` operation is used.
139+
140+
```go
141+
for {
142+
task, err := qu.GetNext("some.topic")
143+
if err != nil {
144+
log.Fatal(err)
145+
}
146+
147+
if task == nil {
148+
time.Sleep(time.Millisecond * 100)
149+
} else {
150+
// process the task
151+
_ = qu.Ack(task.Id.Hex())
152+
}
153+
}
154+
```
155+
156+
## Selfcare
157+
158+
The selfcare function re-schedules long-running tasks, this might happen, if the application could not acknowledge
159+
the task, and it sets the task to the error state, if the maximum number of tries have been exceeded.
160+
161+
The selfcare function might be run per topic, if no topic was given, the selfcare runs over all tasks.
162+
As second argument, the timeout for long-running tasks can be given, if no timeout was given it defaults to 5 minutes.
163+
164+
## CreateIndexes
165+
166+
On first use, you have to call this function, or set the indexes manually on the queue collection.
167+
There will be created two indexes, one on `topic` and `state`, the other one is a TTL-index, which removes completed
168+
tasks after one hour.

cmd/demo/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func main() {
107107
}
108108

109109
if *selfcare {
110-
err := qu.Selfcare("")
110+
err := qu.Selfcare("", 0)
111111
if err != nil {
112112
log.Fatal(err)
113113
}

queue.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type event struct {
4646
Task Task `bson:"fullDocument"`
4747
}
4848

49+
// NewQueue initializes a new Queue instance with the provided DbInterface.
4950
func NewQueue(db DbInterface) *Queue {
5051
queue := Queue{
5152
db: db,
@@ -60,6 +61,8 @@ func setNowFunc(n func() time.Time) {
6061
nowFunc = n
6162
}
6263

64+
// Publish inserts a new task into the queue with the given topic, payload, and maxTries.
65+
// If maxTries is zero, it defaults to DefaultMaxTries.
6366
func (q *Queue) Publish(topic string, payload any, maxTries uint) (*Task, error) {
6467
if maxTries == 0 {
6568
maxTries = DefaultMaxTries
@@ -208,12 +211,17 @@ func (q *Queue) Err(id string, err error) error {
208211
})
209212
}
210213

211-
func (q *Queue) Selfcare(topic string) error {
214+
func (q *Queue) Selfcare(topic string, timeout time.Duration) error {
212215
// re-schedule long-running tasks
213216
// this only happens if the processor could not ack the task, i.e. the application crashed
217+
218+
if timeout == 0 {
219+
timeout = DefaultTimeout
220+
}
221+
214222
query := bson.M{
215223
"state": StateRunning,
216-
"meta.dispatched": bson.M{"$lt": nowFunc().Add(DefaultTimeout)},
224+
"meta.dispatched": bson.M{"$lt": nowFunc().Add(timeout)},
217225
}
218226
if len(topic) > 0 {
219227
query["topic"] = topic

queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ func TestQueue_Selftest(t *testing.T) {
484484
"meta.completed": nowFunc()},
485485
}).Return(tt.error2)
486486

487-
err := q.Selfcare(tt.topic)
487+
err := q.Selfcare(tt.topic, 0)
488488

489489
if tt.error1 != nil {
490490
assert.Equal(t, tt.error1, err)

0 commit comments

Comments
 (0)