Skip to content

Add back pressure #8

@stangelandcl

Description

@stangelandcl

Description
The library doesn't provide back pressure. A slow job at the head of the queue can allow an infinite number of outputs to accumulate in the heap waiting for the oldest job to finish.

To Reproduce

inputChan := make(chan orderedconcurrently.WorkFunction)
outChan := orderedconcurrently.Process(context.Background(),
inputChan, &orderedconcurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
go func() {
      n := 0
      for out := range outChan {
	      n += out.Value.(int)
	      fmt.Println("output count", n)
      }
}()

for i := 0; i < 3; i++ {
        inputChan <- &Runner{}
}

inputChan <- &Slow{}

for i := 0; i < 500*1000*1000; i++ {
      inputChan <- &Runner{}
      if (i+1)%(1000*1000) == 0 {
	      fmt.Println("input count", i+1)
      }
}

close(inputChan)
type WorkFunction interface {
	Run(ctx context.Context) interface{}
}

type Runner struct{}

func (r *Runner) Run(ctx context.Context) interface{} {
	return 1
}

type Slow struct{}

func (r *Slow) Run(ctx context.Context) interface{} {
	time.Sleep(60 * time.Second)
	return 1
}

Output

output count 1
output count 2
output count 3
input count 1000000
input count 2000000
input count 3000000
^Csignal: interrupt

Desired Results
If it supported back pressure then the test would take 9 inputs then wait one minute for the slow job at the head of the queue to complete then start consuming inputs again.

Suggested change
The heap allows an unbounded number of jobs to pile up. A queue that adds items as they are started, has a limit, a way to flag a job as complete or not, signal the returning function to wakeup and check if the head item in the queue has its "completed" flag set yet will result in back pressure.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions