Skip to content

Commit db3e7c2

Browse files
committed
feat: support Stop() in queue
1 parent b545326 commit db3e7c2

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

queue/queue.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ type Queue struct {
3030
Threads int
3131
storage Storage
3232
wake chan struct{}
33-
mut sync.Mutex // guards wake
33+
mut sync.Mutex // guards wake and running
34+
running bool
3435
}
3536

3637
// InMemoryQueueStorage is the default implementation of the Storage interface.
@@ -62,6 +63,7 @@ func New(threads int, s Storage) (*Queue, error) {
6263
return &Queue{
6364
Threads: threads,
6465
storage: s,
66+
running: true,
6567
}, nil
6668
}
6769

@@ -139,6 +141,13 @@ func (q *Queue) Run(c *colly.Collector) error {
139141
return <-errc
140142
}
141143

144+
func (q *Queue) Stop() error {
145+
q.mut.Lock()
146+
q.running = false
147+
q.mut.Unlock()
148+
return nil
149+
}
150+
142151
func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complete <-chan struct{}, errc chan<- error) {
143152
var active int
144153
for {
@@ -147,7 +156,7 @@ func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complet
147156
errc <- err
148157
break
149158
}
150-
if size == 0 && active == 0 {
159+
if size == 0 && active == 0 || !q.running {
151160
// Terminate when
152161
// 1. No active requests
153162
// 2. Emtpy queue

0 commit comments

Comments
 (0)