Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ Gem::Specification.new do |spec|

spec.add_dependency "console", "~> 1.10"
spec.add_dependency "fiber-annotation"
spec.add_dependency "io-event", "~> 1.1"
spec.add_dependency "io-event", "~> 1.5", ">= 1.5.1"
spec.add_dependency "timers", "~> 4.1"
end
27 changes: 27 additions & 0 deletions examples/load/test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env ruby

require_relative '../../lib/async'
require_relative '../../lib/async/idler'

Async do
idler = Async::Idler.new(0.8)

Async do
while true
idler.async do
$stdout.write '.'
while true
sleep 0.1
end
end
end
end

scheduler = Fiber.scheduler
while true
load = scheduler.load

$stdout.write "\nLoad: #{load} "
sleep 1.0
end
end
39 changes: 39 additions & 0 deletions lib/async/idler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

module Async
class Idler
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
@maximum_load = maximum_load
@backoff = backoff
@parent = parent
end

def async(*arguments, parent: (@parent or Task.current), **options, &block)
wait

# It is crucial that we optimistically execute the child task, so that we prevent a tight loop invoking this method from consuming all available resources.
parent.async(*arguments, **options, &block)
end

def wait
scheduler = Fiber.scheduler
backoff = nil

while true
load = scheduler.load
break if load < @maximum_load

if backoff
sleep(backoff)
backoff *= 2.0
else
scheduler.yield
backoff = @backoff
end
end
end
end
end
35 changes: 35 additions & 0 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,33 @@ def initialize(parent = nil, selector: nil)

@blocked = 0

@busy_time = 0.0
@idle_time = 0.0

@timers = ::Timers::Group.new
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
def load
total_time = @busy_time + @idle_time

# If the total time is zero, then the load is zero:
return 0.0 if total_time.zero?

# We normalize to a 1 second window:
if total_time > 1.0
ratio = 1.0 / total_time
@busy_time *= ratio
@idle_time *= ratio

# We don't need to divide here as we've already normalised it to a 1s window:
return @busy_time
else
return @busy_time / total_time
end
end

def scheduler_close
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless $!
Expand Down Expand Up @@ -267,6 +291,8 @@ def run_once(timeout = nil)
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
private def run_once!(timeout = 0)
start_time = Async::Clock.now

interval = @timers.wait_interval

# If there is no interval to wait (thus no timers), and no tasks, we could be done:
Expand All @@ -288,6 +314,15 @@ def run_once(timeout = nil)

@timers.fire

# Compute load:
end_time = Async::Clock.now
total_duration = end_time - start_time
idle_duration = @selector.idle_duration
busy_duration = total_duration - idle_duration

@busy_time += busy_duration
@idle_time += idle_duration

# The reactor still has work to do:
return true
end
Expand Down
35 changes: 35 additions & 0 deletions test/async/idler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2023, by Samuel Williams.

require 'async/idler'
require 'sus/fixtures/async'

require 'chainable_async'

describe Async::Idler do
include Sus::Fixtures::Async::ReactorContext
let(:idler) {subject.new(0.5)}

it 'can schedule tasks up to the desired load' do
# Generate the load:
Async do
while true
idler.async do
while true
sleep 0.1
end
end
end
end

# This test must be longer than the test window...
sleep 1.1

# Verify that the load is within the desired range:
expect(Fiber.scheduler.load).to be_within(0.1).of(0.5)
end

it_behaves_like ChainableAsync
end