@@ -30,7 +30,8 @@ type Queue struct {
3030 Threads int
3131 storage Storage
3232 wake chan struct {}
33- mut sync.Mutex // guards wake
33+ mut sync.Mutex // guards wake and running
34+ running bool
3435}
3536
3637// InMemoryQueueStorage is the default implementation of the Storage interface.
@@ -62,6 +63,7 @@ func New(threads int, s Storage) (*Queue, error) {
6263 return & Queue {
6364 Threads : threads ,
6465 storage : s ,
66+ running : true ,
6567 }, nil
6668}
6769
@@ -122,11 +124,12 @@ func (q *Queue) Size() (int, error) {
122124// The given Storage must not be used directly while Run blocks.
123125func (q * Queue ) Run (c * colly.Collector ) error {
124126 q .mut .Lock ()
125- if q .wake != nil {
127+ if q .wake != nil && q . running == true {
126128 q .mut .Unlock ()
127129 panic ("cannot call duplicate Queue.Run" )
128130 }
129131 q .wake = make (chan struct {})
132+ q .running = true
130133 q .mut .Unlock ()
131134
132135 requestc := make (chan * colly.Request )
@@ -139,6 +142,13 @@ func (q *Queue) Run(c *colly.Collector) error {
139142 return <- errc
140143}
141144
145+ // Stop will stop the running queue
146+ func (q * Queue ) Stop () {
147+ q .mut .Lock ()
148+ q .running = false
149+ q .mut .Unlock ()
150+ }
151+
142152func (q * Queue ) loop (c * colly.Collector , requestc chan <- * colly.Request , complete <- chan struct {}, errc chan <- error ) {
143153 var active int
144154 for {
@@ -147,7 +157,7 @@ func (q *Queue) loop(c *colly.Collector, requestc chan<- *colly.Request, complet
147157 errc <- err
148158 break
149159 }
150- if size == 0 && active == 0 {
160+ if size == 0 && active == 0 || ! q . running {
151161 // Terminate when
152162 // 1. No active requests
153163 // 2. Emtpy queue
0 commit comments