@@ -182,37 +182,67 @@ async def op(pipe):
182
182
@pytest .mark .onlycluster
183
183
async def test_cluster (request , redis_addr ):
184
184
185
- # TODO: This test actually doesn't work. Once the RedisCluster initializes,
186
- # it will re-connect to the nodes as advertised by the cluster, bypassing
187
- # the single DelayProxy we set up.
188
- # to work around this, we really would nedd a port-remapper for the RedisCluster
189
-
190
- redis_addr = redis_addr [0 ], 6372 # use the cluster port
191
185
delay = 0.1
192
- dp = DelayProxy (addr = ("127.0.0.1" , 5381 ), redis_addr = redis_addr )
193
- await dp .start ()
186
+ cluster_port = 6372
187
+ remap_base = 7372
188
+ n_nodes = 6
189
+
190
+ def remap (host , port ):
191
+ return host , remap_base + port - cluster_port
192
+
193
+ proxies = []
194
+ for i in range (n_nodes ):
195
+ port = cluster_port + i
196
+ remapped = remap_base + i
197
+ forward_addr = redis_addr [0 ], port
198
+ proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
199
+ proxies .append (proxy )
200
+
201
+ # start proxies
202
+ await asyncio .gather (* [p .start () for p in proxies ])
203
+
204
+ def all_clear ():
205
+ for p in proxies :
206
+ p .send_event .clear ()
207
+
208
+ async def wait_for_send ():
209
+ asyncio .wait (
210
+ [p .send_event .wait () for p in proxies ], return_when = asyncio .FIRST_COMPLETED
211
+ )
212
+
213
+ @contextlib .contextmanager
214
+ def set_delay (delay : float ):
215
+ with contextlib .ExitStack () as stack :
216
+ for p in proxies :
217
+ stack .enter_context (p .set_delay (delay ))
218
+ yield
194
219
195
- with contextlib .closing (RedisCluster .from_url ("redis://127.0.0.1:5381" )) as r :
220
+ with contextlib .closing (
221
+ RedisCluster .from_url (f"redis://127.0.0.1:{ remap_base } " , host_port_remap = remap )
222
+ ) as r :
196
223
await r .initialize ()
197
224
await r .set ("foo" , "foo" )
198
225
await r .set ("bar" , "bar" )
199
226
200
227
async def op (r ):
201
- with dp . set_delay (delay ):
228
+ with set_delay (delay ):
202
229
return await r .get ("foo" )
203
230
204
- dp . send_event . clear ()
231
+ all_clear ()
205
232
t = asyncio .create_task (op (r ))
206
- # await dp.send_event.wait() # won"t work, because DelayProxy is by-passed
233
+ # Wait for whichever DelayProxy gets the request first
234
+ await wait_for_send ()
207
235
await asyncio .sleep (0.01 )
208
236
t .cancel ()
209
- try :
237
+ with pytest . raises ( asyncio . CancelledError ) :
210
238
await t
211
- except asyncio .CancelledError :
212
- pass
213
239
214
- assert await r .get ("bar" ) == b"bar"
215
- assert await r .ping ()
216
- assert await r .get ("foo" ) == b"foo"
240
+ # try a number of requests to excercise all the connections
241
+ async def doit ():
242
+ assert await r .get ("bar" ) == b"bar"
243
+ assert await r .ping ()
244
+ assert await r .get ("foo" ) == b"foo"
217
245
218
- await dp .stop ()
246
+ await asyncio .gather (* [doit () for _ in range (10 )])
247
+
248
+ await asyncio .gather (* (p .stop () for p in proxies ))
0 commit comments