Skip to content

Commit 658103d

Browse files
Fix #pipelined concurrently using the same Redis connection (#296)
1 parent ed2bb7a commit 658103d

File tree

2 files changed

+35
-6
lines changed

2 files changed

+35
-6
lines changed

lib/redis_client/cluster/pipeline.rb

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,13 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
161161
end
162162
end
163163

164-
all_replies = errors = nil
164+
all_replies = errors = required_redirections = nil
165165

166166
work_group.each do |node_key, v|
167167
case v
168168
when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
169-
all_replies ||= Array.new(@size)
170-
pipeline = @pipelines[node_key]
171-
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
172-
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
169+
required_redirections ||= {}
170+
required_redirections[node_key] = v
173171
when StandardError
174172
errors ||= {}
175173
errors[node_key] = v
@@ -180,9 +178,15 @@ def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Met
180178
end
181179

182180
work_group.close
183-
184181
raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?
185182

183+
required_redirections&.each do |node_key, v|
184+
all_replies ||= Array.new(@size)
185+
pipeline = @pipelines[node_key]
186+
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
187+
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
188+
end
189+
186190
all_replies
187191
end
188192

test/test_against_cluster_state.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,31 @@ def test_the_state_of_cluster_resharding_with_pipelining
5959
end
6060
end
6161

62+
def test_the_state_of_cluster_resharding_with_pipelining_on_new_connection
63+
# This test is excercising a very delicate race condition; i think the use of @client to set
64+
# the keys in do_resharding_test is actually causing the race condition not to happen, so this
65+
# test is actually performing the resharding on its own.
66+
key_count = 10
67+
key_count.times do |i|
68+
key = "key#{i}"
69+
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
70+
src, dest = @controller.select_resharding_target(slot)
71+
@controller.start_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
72+
@controller.finish_resharding(slot: slot, src_node_key: src, dest_node_key: dest)
73+
end
74+
75+
res = @client.pipelined do |p|
76+
key_count.times do |i|
77+
p.call_v(['SET', "key#{i}", "value#{i}"])
78+
end
79+
end
80+
81+
key_count.times do |i|
82+
assert_equal('OK', res[i])
83+
assert_equal("value#{i}", @client.call_v(['GET', "key#{i}"]))
84+
end
85+
end
86+
6287
private
6388

6489
def wait_for_replication

0 commit comments

Comments
 (0)