Skip to content

Commit c271257

Browse files
committed
Introduce a priority queue:
- ### Problem In #9381, I explained the issue about the "tail latency". TL;DR When a gem with a native extensions is downloaded, the sooner we starts compiling, the sooner it gets installed. If a gem with a native extensions ends up at the end of the queue, the longer `bundle install` becomes. ### Solution I'd like to introduce a simple queue with priority. When a gem is downloaded, we check whether that gem has a native extension and if its dependencies are installed. If both conditions are met, we add the gem in the priority queue to be picked up as quickly as possible.
1 parent bcc4469 commit c271257

File tree

5 files changed

+116
-4
lines changed

5 files changed

+116
-4
lines changed

bundler/lib/bundler/installer/parallel_installer.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def enqueued?
2424
state == :enqueued
2525
end
2626

27+
def enqueue_with_priority?
28+
state == :installable && spec.extensions.any?
29+
end
30+
2731
def failed?
2832
state == :failed
2933
end
@@ -194,7 +198,7 @@ def process_specs(installed_specs)
194198
spec.state = :installable
195199
end
196200

197-
worker_pool.enq(spec)
201+
worker_pool.enq(spec, priority: spec.enqueue_with_priority?)
198202
end
199203

200204
def finished_installing?

bundler/lib/bundler/worker.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def initialize(exn)
2222
def initialize(size, name, func)
2323
@name = name
2424
@request_queue = Thread::Queue.new
25+
@request_queue_with_priority = Thread::Queue.new
2526
@response_queue = Thread::Queue.new
2627
@func = func
2728
@size = size
@@ -32,9 +33,10 @@ def initialize(size, name, func)
3233
# Enqueue a request to be executed in the worker pool
3334
#
3435
# @param obj [String] mostly it is name of spec that should be downloaded
35-
def enq(obj)
36+
def enq(obj, priority: false)
37+
queue = priority ? @request_queue_with_priority : @request_queue
3638
create_threads unless @threads
37-
@request_queue.enq obj
39+
queue.enq obj
3840
end
3941

4042
# Retrieves results of job function being executed in worker pool
@@ -52,7 +54,13 @@ def stop
5254

5355
def process_queue(i)
5456
loop do
55-
obj = @request_queue.deq
57+
obj = begin
58+
@request_queue_with_priority.deq(true)
59+
rescue ThreadError
60+
@request_queue.deq(false, timeout: 0.05)
61+
end
62+
63+
next if obj.nil?
5664
break if obj.equal? POISON
5765
@response_queue.enq apply_func(obj, i)
5866
end
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# frozen_string_literal: true
2+
3+
require "bundler/installer/parallel_installer"
4+
require "bundler/rubygems_gem_installer"
5+
require "rubygems/remote_fetcher"
6+
require "bundler"
7+
8+
RSpec.describe Bundler::ParallelInstaller do
9+
describe "priority queue" do
10+
before do
11+
require "support/artifice/compact_index"
12+
13+
@previous_client = Gem::Request::ConnectionPools.client
14+
Gem::Request::ConnectionPools.client = Gem::Net::HTTP
15+
Gem::RemoteFetcher.fetcher.close_all
16+
17+
build_repo2 do
18+
build_gem "gem_with_extension", &:add_c_extension
19+
build_gem "gem_without_extension"
20+
end
21+
22+
gemfile <<~G
23+
source "https://gem.repo2"
24+
25+
gem "gem_with_extension"
26+
gem "gem_without_extension"
27+
G
28+
lockfile <<~L
29+
GEM
30+
remote: https://gem.repo2/
31+
specs:
32+
gem_with_extension (1.0)
33+
gem_without_extension (1.0)
34+
35+
DEPENDENCIES
36+
gem_with_extension
37+
gem_without_extension
38+
L
39+
40+
@old_ui = Bundler.ui
41+
Bundler.ui = Bundler::UI::Silent.new
42+
end
43+
44+
after do
45+
Bundler.ui = @old_ui
46+
Gem::Request::ConnectionPools.client = @previous_client
47+
Artifice.deactivate
48+
end
49+
50+
let(:definition) do
51+
allow(Bundler).to receive(:root) { bundled_app }
52+
53+
definition = Bundler::Definition.build(bundled_app.join("Gemfile"), bundled_app.join("Gemfile.lock"), false)
54+
definition.tap(&:setup_domain!)
55+
end
56+
let(:installer) { Bundler::Installer.new(bundled_app, definition) }
57+
58+
it "queues native extensions in priority" do
59+
parallel_installer = Bundler::ParallelInstaller.new(installer, definition.specs, 2, false, true)
60+
worker_pool = parallel_installer.send(:worker_pool)
61+
expected = 6 # Enqueue to download bundler and the 2 gems. Enqueue to install Bundler and the 2 gems.
62+
63+
expect(worker_pool).to receive(:enq).exactly(expected).times.and_wrap_original do |original_enq, spec, opts|
64+
unless opts.nil? # Enqueued for download, no priority
65+
if spec.name == "gem_with_extension"
66+
expect(opts).to eq({ priority: true })
67+
else
68+
expect(opts).to eq({ priority: false })
69+
end
70+
end
71+
72+
opts ||= {}
73+
original_enq.call(spec, **opts)
74+
end
75+
76+
parallel_installer.call
77+
end
78+
end
79+
end

bundler/spec/bundler/worker_spec.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,26 @@
2020
end
2121
end
2222

23+
describe "priority queue" do
24+
it "process elements from the priority queue first" do
25+
processed_elements = []
26+
27+
function = proc do |element, _|
28+
processed_elements << element
29+
end
30+
31+
worker = described_class.new(1, "Spec Worker", function)
32+
worker.instance_variable_set(:@threads, []) # Prevent the enqueueing from starting work.
33+
worker.enq("Normal element")
34+
worker.enq("Priority element", priority: true)
35+
worker.send(:create_threads)
36+
37+
worker.stop
38+
39+
expect(processed_elements).to eq(["Priority element", "Normal element"])
40+
end
41+
end
42+
2343
describe "handling interrupts" do
2444
let(:status) do
2545
pid = Process.fork do

bundler/spec/support/windows_tag_group.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ module WindowsTagGroup
137137
"spec/bundler/build_metadata_spec.rb",
138138
"spec/bundler/current_ruby_spec.rb",
139139
"spec/bundler/installer/gem_installer_spec.rb",
140+
"spec/bundler/installer/parallel_installer_spec.rb",
140141
"spec/bundler/cli_common_spec.rb",
141142
"spec/bundler/ci_detector_spec.rb",
142143
],

0 commit comments

Comments
 (0)