Skip to content

Commit 061d97a

Browse files
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio * Async Cluster Tests: Async/Await * Add Async RedisCluster * cluster: use ERRORS_ALLOW_RETRY from self.__class__ * async_cluster: rework redis_connection, initialize, & close - move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class - use Locks while initializing or closing - in case of error, close connections instead of instantly reinitializing - create ResourceWarning instead of manually deleting client object - use asyncio.gather to run commands/initialize/close in parallel - inline single use functions - fix test_acl_log for py3.6 * async_cluster: add types * async_cluster: add docs * docs: update sphinx & add sphinx_autodoc_typehints * async_cluster: move TargetNodesT to cluster module * async_cluster/commands: inherit commands from sync class if possible * async_cluster: add benchmark script with aredis & aioredis-cluster * async_cluster: remove logging * async_cluster: inline functions * async_cluster: manage Connection instead of Redis Client * async_cluster/commands: optimize parser * async_cluster: use ensure_future & generators for gather * async_conn: optimize * async_cluster: optimize determine_slot * async_cluster: optimize determine_nodes * async_cluster/parser: optimize _get_moveable_keys * async_cluster: inlined check_slots_coverage * async_cluster: update docstrings * async_cluster: add concurrent test & use read_response/_update_moved_slots without lock Co-authored-by: Chayim <[email protected]>
1 parent c25be04 commit 061d97a

27 files changed

+4541
-524
lines changed

benchmarks/cluster_async.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import asyncio
2+
import functools
3+
import time
4+
5+
import aioredis_cluster
6+
import aredis
7+
import uvloop
8+
9+
import redis.asyncio as redispy
10+
11+
12+
def timer(func):
13+
@functools.wraps(func)
14+
async def wrapper(*args, **kwargs):
15+
tic = time.perf_counter()
16+
await func(*args, **kwargs)
17+
toc = time.perf_counter()
18+
return f"{toc - tic:.4f}"
19+
20+
return wrapper
21+
22+
23+
@timer
24+
async def set_str(client, gather, data):
25+
if gather:
26+
for _ in range(count // 100):
27+
await asyncio.gather(
28+
*(
29+
asyncio.create_task(client.set(f"bench:str_{i}", data))
30+
for i in range(100)
31+
)
32+
)
33+
else:
34+
for i in range(count):
35+
await client.set(f"bench:str_{i}", data)
36+
37+
38+
@timer
39+
async def set_int(client, gather, data):
40+
if gather:
41+
for _ in range(count // 100):
42+
await asyncio.gather(
43+
*(
44+
asyncio.create_task(client.set(f"bench:int_{i}", data))
45+
for i in range(100)
46+
)
47+
)
48+
else:
49+
for i in range(count):
50+
await client.set(f"bench:int_{i}", data)
51+
52+
53+
@timer
54+
async def get_str(client, gather):
55+
if gather:
56+
for _ in range(count // 100):
57+
await asyncio.gather(
58+
*(asyncio.create_task(client.get(f"bench:str_{i}")) for i in range(100))
59+
)
60+
else:
61+
for i in range(count):
62+
await client.get(f"bench:str_{i}")
63+
64+
65+
@timer
66+
async def get_int(client, gather):
67+
if gather:
68+
for _ in range(count // 100):
69+
await asyncio.gather(
70+
*(asyncio.create_task(client.get(f"bench:int_{i}")) for i in range(100))
71+
)
72+
else:
73+
for i in range(count):
74+
await client.get(f"bench:int_{i}")
75+
76+
77+
@timer
78+
async def hset(client, gather, data):
79+
if gather:
80+
for _ in range(count // 100):
81+
await asyncio.gather(
82+
*(
83+
asyncio.create_task(client.hset("bench:hset", str(i), data))
84+
for i in range(100)
85+
)
86+
)
87+
else:
88+
for i in range(count):
89+
await client.hset("bench:hset", str(i), data)
90+
91+
92+
@timer
93+
async def hget(client, gather):
94+
if gather:
95+
for _ in range(count // 100):
96+
await asyncio.gather(
97+
*(
98+
asyncio.create_task(client.hget("bench:hset", str(i)))
99+
for i in range(100)
100+
)
101+
)
102+
else:
103+
for i in range(count):
104+
await client.hget("bench:hset", str(i))
105+
106+
107+
@timer
108+
async def incr(client, gather):
109+
if gather:
110+
for _ in range(count // 100):
111+
await asyncio.gather(
112+
*(asyncio.create_task(client.incr("bench:incr")) for i in range(100))
113+
)
114+
else:
115+
for i in range(count):
116+
await client.incr("bench:incr")
117+
118+
119+
@timer
120+
async def lpush(client, gather, data):
121+
if gather:
122+
for _ in range(count // 100):
123+
await asyncio.gather(
124+
*(
125+
asyncio.create_task(client.lpush("bench:lpush", data))
126+
for i in range(100)
127+
)
128+
)
129+
else:
130+
for i in range(count):
131+
await client.lpush("bench:lpush", data)
132+
133+
134+
@timer
135+
async def lrange_300(client, gather):
136+
if gather:
137+
for _ in range(count // 100):
138+
await asyncio.gather(
139+
*(
140+
asyncio.create_task(client.lrange("bench:lpush", i, i + 300))
141+
for i in range(100)
142+
)
143+
)
144+
else:
145+
for i in range(count):
146+
await client.lrange("bench:lpush", i, i + 300)
147+
148+
149+
@timer
150+
async def lpop(client, gather):
151+
if gather:
152+
for _ in range(count // 100):
153+
await asyncio.gather(
154+
*(asyncio.create_task(client.lpop("bench:lpush")) for i in range(100))
155+
)
156+
else:
157+
for i in range(count):
158+
await client.lpop("bench:lpush")
159+
160+
161+
@timer
162+
async def warmup(client):
163+
await asyncio.gather(
164+
*(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100))
165+
)
166+
167+
168+
@timer
169+
async def run(client, gather):
170+
data_str = "a" * size
171+
data_int = int("1" * size)
172+
173+
if gather is False:
174+
for ret in await asyncio.gather(
175+
asyncio.create_task(set_str(client, gather, data_str)),
176+
asyncio.create_task(set_int(client, gather, data_int)),
177+
asyncio.create_task(hset(client, gather, data_str)),
178+
asyncio.create_task(incr(client, gather)),
179+
asyncio.create_task(lpush(client, gather, data_int)),
180+
):
181+
print(ret)
182+
for ret in await asyncio.gather(
183+
asyncio.create_task(get_str(client, gather)),
184+
asyncio.create_task(get_int(client, gather)),
185+
asyncio.create_task(hget(client, gather)),
186+
asyncio.create_task(lrange_300(client, gather)),
187+
asyncio.create_task(lpop(client, gather)),
188+
):
189+
print(ret)
190+
else:
191+
print(await set_str(client, gather, data_str))
192+
print(await set_int(client, gather, data_int))
193+
print(await hset(client, gather, data_str))
194+
print(await incr(client, gather))
195+
print(await lpush(client, gather, data_int))
196+
197+
print(await get_str(client, gather))
198+
print(await get_int(client, gather))
199+
print(await hget(client, gather))
200+
print(await lrange_300(client, gather))
201+
print(await lpop(client, gather))
202+
203+
204+
async def main(loop, gather=None):
205+
arc = aredis.StrictRedisCluster(
206+
host=host,
207+
port=port,
208+
password=password,
209+
max_connections=2 ** 31,
210+
max_connections_per_node=2 ** 31,
211+
readonly=False,
212+
reinitialize_steps=count,
213+
skip_full_coverage_check=True,
214+
decode_responses=False,
215+
max_idle_time=count,
216+
idle_check_interval=count,
217+
)
218+
print(f"{loop} {gather} {await warmup(arc)} aredis")
219+
print(await run(arc, gather=gather))
220+
arc.connection_pool.disconnect()
221+
222+
aiorc = await aioredis_cluster.create_redis_cluster(
223+
[(host, port)],
224+
password=password,
225+
state_reload_interval=count,
226+
idle_connection_timeout=count,
227+
pool_maxsize=2 ** 31,
228+
)
229+
print(f"{loop} {gather} {await warmup(aiorc)} aioredis-cluster")
230+
print(await run(aiorc, gather=gather))
231+
aiorc.close()
232+
await aiorc.wait_closed()
233+
234+
async with redispy.RedisCluster(
235+
host=host,
236+
port=port,
237+
password=password,
238+
reinitialize_steps=count,
239+
read_from_replicas=False,
240+
decode_responses=False,
241+
max_connections=2 ** 31,
242+
) as rca:
243+
print(f"{loop} {gather} {await warmup(rca)} redispy")
244+
print(await run(rca, gather=gather))
245+
246+
247+
if __name__ == "__main__":
248+
host = "localhost"
249+
port = 16379
250+
password = None
251+
252+
count = 1000
253+
size = 16
254+
255+
asyncio.run(main("asyncio"))
256+
asyncio.run(main("asyncio", gather=False))
257+
asyncio.run(main("asyncio", gather=True))
258+
259+
uvloop.install()
260+
261+
asyncio.run(main("uvloop"))
262+
asyncio.run(main("uvloop", gather=False))
263+
asyncio.run(main("uvloop", gather=True))

docs/conf.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"nbsphinx",
3131
"sphinx_gallery.load_style",
3232
"sphinx.ext.autodoc",
33+
"sphinx_autodoc_typehints",
3334
"sphinx.ext.doctest",
3435
"sphinx.ext.viewcode",
3536
"sphinx.ext.autosectionlabel",
@@ -41,6 +42,10 @@
4142
autosectionlabel_prefix_document = True
4243
autosectionlabel_maxdepth = 2
4344

45+
# AutodocTypehints settings.
46+
always_document_param_types = True
47+
typehints_defaults = "comma"
48+
4449
# Add any paths that contain templates here, relative to this directory.
4550
templates_path = ["_templates"]
4651

@@ -210,7 +215,7 @@
210215
# (source start file, target name, title, author, documentclass
211216
# [howto/manual]).
212217
latex_documents = [
213-
("index", "redis-py.tex", "redis-py Documentation", "Redis Inc", "manual"),
218+
("index", "redis-py.tex", "redis-py Documentation", "Redis Inc", "manual")
214219
]
215220

216221
# The name of an image file (relative to this directory) to place at the top of
@@ -258,7 +263,7 @@
258263
"redis-py",
259264
"One line description of project.",
260265
"Miscellaneous",
261-
),
266+
)
262267
]
263268

264269
# Documents to append as an appendix to all manuals.

0 commit comments

Comments
 (0)