@@ -66,13 +66,15 @@ type PublishOptions struct {
66
66
Tries int
67
67
}
68
68
69
+ // NewPublishOptions creates a new PublishOptions with default settings.
69
70
func NewPublishOptions () * PublishOptions {
70
71
return & PublishOptions {
71
72
MaxTries : 0 ,
72
73
Tries : - 1 ,
73
74
}
74
75
}
75
76
77
+ // SetMaxTries sets the maximum number of retry attempts for publishing. Returns the updated PublishOptions instance.
76
78
func (p * PublishOptions ) SetMaxTries (maxTries uint ) * PublishOptions {
77
79
p .MaxTries = maxTries
78
80
return p
@@ -128,6 +130,7 @@ func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Ta
128
130
return & t , nil
129
131
}
130
132
133
+ // GetNext retrieves the next item from the queue for the given topic, marks it as running, and increments its tries count.
131
134
func (q * Queue ) GetNext (topic string ) (* Task , error ) {
132
135
t := Task {}
133
136
res := q .db .FindOneAndUpdate (bson.M {
@@ -153,6 +156,7 @@ func (q *Queue) GetNext(topic string) (*Task, error) {
153
156
return & t , nil
154
157
}
155
158
159
+ // GetNextById retrieves the next pending task by its ID, transitions it to the running state, and increments its tries count.
156
160
func (q * Queue ) GetNextById (id primitive.ObjectID ) (* Task , error ) {
157
161
t := Task {}
158
162
res := q .db .FindOneAndUpdate (bson.M {
@@ -178,12 +182,15 @@ func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error) {
178
182
return & t , nil
179
183
}
180
184
185
+ // Reschedule republishes a task to the queue, retaining its topic, payload, tries, and maxTries settings.
181
186
func (q * Queue ) Reschedule (task * Task ) (* Task , error ) {
182
187
return q .Publish (task .Topic , task .Payload , NewPublishOptions ().setTries (task .Tries ).SetMaxTries (task .MaxTries ))
183
188
}
184
189
185
190
type Callback func (t Task )
186
191
192
+ // Subscribe listens for new tasks on a given topic and calls the provided callback when a new task is available.
193
+ // It processes unprocessed tasks scheduled before starting the watch and continuously monitors for new tasks.
187
194
func (q * Queue ) Subscribe (topic string , cb Callback ) error {
188
195
pipeline := bson.D {{"$match" , bson.D {
189
196
{"operationType" , "insert" },
@@ -240,6 +247,7 @@ func (q *Queue) Subscribe(topic string, cb Callback) error {
240
247
return nil
241
248
}
242
249
250
+ // Ack acknowledges a task completion by its ID, updating its state to "completed" and setting the completion timestamp.
243
251
func (q * Queue ) Ack (id string ) error {
244
252
oId , err := primitive .ObjectIDFromHex (id )
245
253
if err != nil {
@@ -254,6 +262,7 @@ func (q *Queue) Ack(id string) error {
254
262
}})
255
263
}
256
264
265
+ // Err updates the state of a task to "error" by its ID, setting the completion time and storing the error message.
257
266
func (q * Queue ) Err (id string , err error ) error {
258
267
oId , e := primitive .ObjectIDFromHex (id )
259
268
if e != nil {
@@ -269,6 +278,9 @@ func (q *Queue) Err(id string, err error) error {
269
278
})
270
279
}
271
280
281
+ // Selfcare re-schedules long-running tasks and sets tasks exceeding max tries to error state.
282
+ // It updates tasks in an ongoing state that haven't been acknowledged within a specific timeout period.
283
+ // If timeout is zero, the default timeout value is used. Optionally, tasks can be filtered by topic.
272
284
func (q * Queue ) Selfcare (topic string , timeout time.Duration ) error {
273
285
// re-schedule long-running tasks
274
286
// this only happens if the processor could not ack the task, i.e. the application crashed
@@ -319,6 +331,7 @@ func (q *Queue) Selfcare(topic string, timeout time.Duration) error {
319
331
return nil
320
332
}
321
333
334
+ // CreateIndexes creates MongoDB indexes for the task collection to improve query performance and manage TTL for completed tasks.
322
335
func (q * Queue ) CreateIndexes () error {
323
336
err := q .db .CreateIndexes ([]mongo.IndexModel {{
324
337
Keys : bson.D {{"topic" , 1 }, {"state" , 1 }},
0 commit comments