Skip to content

Commit 727ce8c

Browse files
author
KJ Tsanaktsidis
committed
Implement RedisClient::Cluster#with
This implements support for "pinning" a cluster client to a particular keyslot. This allows the use of WATCH/MULTI/EXEC transactions on that node without any ambiguity. Because RedisClient also implements watch as "yield self" and ignores all arguments, this means that you can easily write transactions that are compatible with both cluster and non-clustered redis (provided you use hashtags for your keys).
1 parent 364bd8f commit 727ce8c

File tree

7 files changed

+485
-0
lines changed

7 files changed

+485
-0
lines changed

.rubocop.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ Metrics/ClassLength:
1919

2020
Metrics/ModuleLength:
2121
Max: 500
22+
Exclude:
23+
- 'test/**/*'
2224

2325
Metrics/MethodLength:
2426
Max: 50

README.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,94 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3')
168168
#=> [nil, nil, nil]
169169
```
170170

171+
## Transactions
172+
This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`,
173+
and conditional execution with `WATCH`. Redis does not support cross-slot transactions, so all keys used within a
174+
transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single slot using
175+
`#with`. Pass a key to `#with`, and the yielded connection object will enforce that all keys used in the block must
176+
belong to the same slot.
177+
178+
```ruby
179+
cli.with(key: '{key}1') do |conn|
180+
conn.call('SET', '{key}1', 'This is OK')
181+
#=> "OK"
182+
conn.call('SET', '{key}2', 'This is also OK; it has the same hashtag, and so the same slot')
183+
#=> "OK"
184+
conn.call('SET', '{otherslot}3', 'This will raise an exception; this key is not in the same slot as {key}1')
185+
#=> Connection pinned to slot 12539 but command ["SET", "{otherslot}3", "..."] includes key {otherslot}3 in slot 10271 (RedisClient::Cluster::Transaction::ConsistencyError)
186+
end
187+
```
188+
189+
When using hash tags to enforce same-slot key hashing, it's often neat to simply pass the hashtag _only_ to `#with`:
190+
191+
```ruby
192+
cli.with(key: '{myslot}') do |conn|
193+
# You can use any key starting with {myslot} in this block
194+
end
195+
```
196+
197+
Once you have pinned a client to a particular slot, you can use the same transaction APIs as the
198+
[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows.
199+
200+
```ruby
201+
# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two.
202+
cli.call('SET', 'key', 0)
203+
cli.with(key: 'key') do |conn|
204+
conn.multi do |txn|
205+
txn.call('INCR', 'key')
206+
txn.call('INCR', 'key')
207+
end
208+
#=> ['OK', 'OK']
209+
end
210+
211+
# Conditional execution with WATCH can be used to e.g. atomically swap two keys
212+
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
213+
cli.with(key: '{key}') do |conn|
214+
conn.call('WATCH', '{key}1', '{key}2')
215+
conn.multi do |txn|
216+
old_key1 = conn.call('GET', '{key}1')
217+
old_key2 = conn.call('GET', '{key}2')
218+
txn.call('SET', '{key}1', old_key2)
219+
txn.call('SET', '{key}2', old_key1)
220+
end
221+
# This transaction will swap the values of {key}1 and {key}2 only if no concurrent connection modified
222+
# either of the values
223+
end
224+
225+
# You can also pass watch: to #multi as a shortcut
226+
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
227+
cli.with(key: '{key}') do |conn|
228+
conn.multi(watch: ['{key}1', '{key}2']) do |txn|
229+
old_key1, old_key2 = conn.call('MGET', '{key}1', '{key}2')
230+
txn.call('MSET', '{key}1', old_key2, '{key}2', old_key1)
231+
end
232+
end
233+
```
234+
235+
Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because
236+
you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot
237+
it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero
238+
`retry_count` to `#with`.
239+
240+
```ruby
241+
cli.with(key: '{key}', retry_count: 1) do |conn|
242+
conn.call('GET', '{key}1')
243+
#=> "value1"
244+
245+
# Now, some changes in cluster topology mean that {key} is moved to a different node!
246+
247+
conn.call('GET', '{key}2')
248+
#=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError)
249+
250+
# Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered
251+
# correct node.
252+
end
253+
```
254+
255+
Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its
256+
arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with`
257+
call will pin the connection to a slot when using clustering, or be a no-op when not.
258+
171259
## ACL
172260
The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly.
173261
So please permit it like the followings.

lib/redis_client/cluster.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require 'redis_client/cluster/concurrent_worker'
4+
require 'redis_client/cluster/pinning'
45
require 'redis_client/cluster/pipeline'
56
require 'redis_client/cluster/pub_sub'
67
require 'redis_client/cluster/router'
@@ -97,6 +98,20 @@ def pubsub
9798
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
9899
end
99100

101+
def with(key:, write: true, retry_count: 0)
102+
raise ArgumentError, 'key must be provided' if key.nil? || key.empty?
103+
104+
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
105+
node_key = @router.find_node_key_by_key(key, primary: write)
106+
node = @router.find_node(node_key)
107+
# Calling #with checks out the underlying connection if this is a pooled connection
108+
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
109+
# transaction if so.
110+
@router.try_delegate(node, :with, retry_count: retry_count) do |conn|
111+
yield ::RedisClient::Cluster::Pinning::ConnectionProxy.new(conn, slot, @router, @command_builder)
112+
end
113+
end
114+
100115
def close
101116
@concurrent_worker.close
102117
@router.close

lib/redis_client/cluster/pinning.rb

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# frozen_string_literal: true
2+
3+
require 'redis_client'
4+
require 'delegate'
5+
6+
class RedisClient
7+
class Cluster
8+
module Pinning
9+
EMPTY_ARRAY = [].freeze
10+
11+
class BaseDelegator < SimpleDelegator
12+
def initialize(conn, slot, router, command_builder)
13+
@conn = conn
14+
@slot = slot
15+
@router = router
16+
@command_builder = command_builder
17+
super(@conn)
18+
end
19+
20+
private
21+
22+
def ensure_key_not_cross_slot!(key, method_name)
23+
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
24+
return unless slot != @slot
25+
26+
raise ::RedisClient::Cluster::Transaction::ConsistencyError.new,
27+
"Connection pinned to slot #{@slot} but method #{method_name} called with key #{key} in slot #{slot}"
28+
end
29+
30+
def ensure_command_not_cross_slot!(command)
31+
command_keys = @router.command_extract_all_keys(command)
32+
command_keys.each do |key|
33+
slot = ::RedisClient::Cluster::KeySlotConverter.convert(key)
34+
if slot != @slot
35+
raise ::RedisClient::Cluster::Transaction::ConsistencyError.new,
36+
"Connection pinned to slot #{@slot} but command #{command.inspect} includes key #{key} in slot #{slot}"
37+
end
38+
end
39+
end
40+
end
41+
42+
module CallDelegator
43+
def call(*command, **kwargs)
44+
command = @command_builder.generate(command, kwargs)
45+
ensure_command_not_cross_slot! command
46+
super
47+
end
48+
alias call_once call
49+
50+
def call_v(command)
51+
command = @command_builder.generate(command)
52+
ensure_command_not_cross_slot! command
53+
super
54+
end
55+
alias call_once_v call_v
56+
end
57+
58+
module BlockingCallDelegator
59+
def blocking_call(timeout, *command, **kwargs)
60+
command = @command_builder.generate(command, kwargs)
61+
ensure_command_not_cross_slot! command
62+
super
63+
end
64+
65+
def blocking_call_v(timeout, command)
66+
command = @command_builder.generate(command)
67+
ensure_command_not_cross_slot! command
68+
super
69+
end
70+
end
71+
72+
class PipelineProxy < BaseDelegator
73+
include CallDelegator
74+
include BlockingCallDelegator
75+
end
76+
77+
class MultiProxy < BaseDelegator
78+
include CallDelegator
79+
end
80+
81+
class ConnectionProxy < BaseDelegator
82+
include CallDelegator
83+
include BlockingCallDelegator
84+
85+
def sscan(key, *args, **kwargs, &block)
86+
ensure_key_not_cross_slot! key, :sscan
87+
super
88+
end
89+
90+
def hscan(key, *args, **kwargs, &block)
91+
ensure_key_not_cross_slot! key, :hscan
92+
super
93+
end
94+
95+
def zscan(key, *args, **kwargs, &block)
96+
ensure_key_not_cross_slot! key, :zscan
97+
super
98+
end
99+
100+
def pipelined
101+
@conn.pipelined do |conn_pipeline|
102+
yield PipelineProxy.new(conn_pipeline, @slot, @router, @command_builder)
103+
end
104+
end
105+
106+
def multi(watch: nil)
107+
call('WATCH', *watch) if watch
108+
begin
109+
@conn.pipelined do |conn_pipeline|
110+
txn = MultiProxy.new(conn_pipeline, @slot, @router, @command_builder)
111+
txn.call('MULTI')
112+
yield txn
113+
txn.call('EXEC')
114+
end.last
115+
rescue StandardError
116+
call('UNWATCH') if connected? && watch
117+
raise
118+
end
119+
end
120+
end
121+
end
122+
end
123+
end

lib/redis_client/cluster/router.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ def command_exists?(name)
199199
@command.exists?(name)
200200
end
201201

202+
def command_extract_all_keys(command)
203+
@command.extract_all_keys(command)
204+
end
205+
202206
def assign_redirection_node(err_msg)
203207
_, slot, node_key = err_msg.split
204208
slot = slot.to_i

0 commit comments

Comments
 (0)