@@ -41,7 +41,7 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
4141 processChan := make (chan * processInput )
4242 aggregatorChan := make (chan * processInput )
4343 wg := sync.WaitGroup {}
44- doneChan := make (chan bool )
44+ doneSemaphoreChan := make (chan bool )
4545 // Go routine to print data in order
4646 go func () {
4747 var current uint64
@@ -70,33 +70,26 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
7070 }
7171 if aggregatorChan == nil {
7272 close (outputChan )
73- doneChan <- true
73+ doneSemaphoreChan <- true
7474 }
7575 }
7676 }()
7777
78- closeOnce := sync. Once {}
78+ inputClosedSemaphoreChan := make ( chan bool )
7979 // Create a goroutine pool
8080 for i := 0 ; i < processors ; i ++ {
8181 go func () {
82- for {
82+ for input := range processChan {
83+ wg .Add (1 )
84+ input .value = wf (input .value )
85+ input .wg = & wg
86+ aggregatorChan <- input
8387 select {
84- case input , ok := <- processChan :
85- if ok {
86- input .value = wf (input .value )
87- wg .Add (1 )
88- input .wg = & wg
89- aggregatorChan <- input
90- } else {
91- processChan = nil
92- }
93- }
94- if processChan == nil {
88+ case <- inputClosedSemaphoreChan :
9589 wg .Wait ()
96- // Safe. This will be triggered only once WG has finished
97- closeOnce .Do (func () {
98- close (aggregatorChan )
99- })
90+ close (aggregatorChan )
91+ default :
92+ continue
10093 }
10194 }
10295 }()
@@ -115,10 +108,11 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
115108 }
116109 if inputChan == nil {
117110 close (processChan )
111+ inputClosedSemaphoreChan <- true
118112 break
119113 }
120114 }
121- <- doneChan
115+ <- doneSemaphoreChan
122116 }()
123117 return outputChan
124118}
0 commit comments