12
12
% %
13
13
% % The Initial Developer of the Original Code is Pivotal Software, Inc.
14
14
% % Copyright (c) 2025 Broadcom. All Rights Reserved.
15
- % % The term “ Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
15
+ % % The term " Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
16
16
% %
17
17
18
18
-module (rabbit_stream_partitions_SUITE ).
@@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
107
107
% % another node will be isolated
108
108
? assertEqual (L # node .name , coordinator_leader (Config )),
109
109
110
+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
111
+
110
112
{ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
111
113
{ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
112
114
{ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
@@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
135
137
end , Consumers1 ),
136
138
# consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
137
139
140
+ log (" Isolating node ~p " , [Isolated ]),
141
+
138
142
rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
139
143
rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
140
144
141
145
wait_for_disconnected_consumer (Config , LN , S ),
142
146
wait_for_presumed_down_consumer (Config , LN , S ),
143
147
148
+ log (" Node ~p rejoins cluster" , [Isolated ]),
149
+
144
150
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
145
151
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
146
152
147
153
wait_for_all_consumers_connected (Config , LN , S ),
148
154
149
155
Consumers2 = query_consumers (Config , LN , S ),
156
+ log (" Consumers after partition resolution: ~p " , [Consumers2 ]),
157
+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
150
158
% % the disconnected, then presumed down consumer is cancelled,
151
159
% % because the stream member on its node has been restarted
152
160
assertSize (2 , Consumers2 ),
@@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
157
165
% % assert the cancelled consumer received a metadata update frame
158
166
SubIdToState1 =
159
167
maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
168
+ log (" Expecting metadata update for disconnected consumer" ),
160
169
C1 = receive_metadata_update (S0 , C0 ),
170
+ log (" Received metadata update" ),
161
171
Acc #{K => {S0 , C1 }};
162
172
(K , {S0 , C0 }, Acc ) ->
163
173
Acc #{K => {S0 , C0 }}
164
174
end , #{}, SubIdToState0 ),
165
175
176
+ log (" Deleting stream" ),
166
177
delete_stream (stream_port (Config , 0 ), S ),
167
178
168
179
% % online consumers should receive a metadata update frame (stream deleted)
169
180
% % we unqueue the this frame before closing the connection
170
181
% % directly closing the connection of the cancelled consumer
171
182
maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
172
- {_ , C1 } = receive_commands (S0 , C0 ),
183
+ log (" Expecting frame in consumer ~p " , [K ]),
184
+ {Cmd1 , C1 } = receive_commands (S0 , C0 ),
185
+ log (" Received ~p " , [Cmd1 ]),
186
+ log (" Closing" ),
173
187
{ok , _ } = stream_test_utils :close (S0 , C1 );
174
- (_ , {S0 , C0 }) ->
188
+ (K , {S0 , C0 }) ->
189
+ log (" Closing ~p " , [K ]),
175
190
{ok , _ } = stream_test_utils :close (S0 , C0 )
176
191
end , SubIdToState1 ),
177
192
@@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
190
205
% % the coordinator leader node will be isolated
191
206
? assertNotEqual (L # node .name , CL ),
192
207
208
+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
209
+
193
210
{ok , So0 , C0_00 } = stream_test_utils :connect (Config , CL ),
194
211
{ok , So1 , C1_00 } = stream_test_utils :connect (Config , CF1 ),
195
212
{ok , So2 , C2_00 } = stream_test_utils :connect (Config , CF2 ),
@@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
216
233
end , Consumers1 ),
217
234
# consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
218
235
236
+ log (" Isolating node ~p " , [Isolated ]),
237
+
219
238
rabbit_ct_broker_helpers :block_traffic_between (Isolated , CF1 ),
220
239
rabbit_ct_broker_helpers :block_traffic_between (Isolated , CF2 ),
221
240
222
241
wait_for_disconnected_consumer (Config , NotIsolated , S ),
223
242
wait_for_presumed_down_consumer (Config , NotIsolated , S ),
224
243
244
+ log (" Node ~p rejoins cluster" , [Isolated ]),
245
+
225
246
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , CF1 ),
226
247
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , CF2 ),
227
248
@@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
231
252
232
253
Consumers2 = query_consumers (Config , NotIsolated , S ),
233
254
255
+ log (" Consumers after partition resolution ~p " , [Consumers2 ]),
256
+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
234
257
% % the disconnected, then presumed down consumer is cancelled,
235
258
% % because the stream member on its node has been restarted
236
259
assertSize (2 , Consumers2 ),
@@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
246
269
247
270
SubIdToState1 =
248
271
maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
272
+ log (" Expecting metadata update for disconnected consumer" ),
249
273
% % cancelled consumer received a metadata update
250
274
C1 = receive_metadata_update (S0 , C0 ),
275
+ log (" Received metadata update" ),
251
276
Acc #{K => {S0 , C1 }};
252
277
(K , {S0 , C0 }, Acc ) when K == ActiveSubId ->
278
+ log (" Expecting consumer update for promoted consumer" ),
253
279
% % promoted consumer should have received consumer update
254
280
C1 = receive_consumer_update_and_respond (S0 , C0 ),
281
+ log (" Received consumer update" ),
255
282
Acc #{K => {S0 , C1 }};
256
283
(K , {S0 , C0 }, Acc ) ->
257
284
Acc #{K => {S0 , C0 }}
258
285
end , #{}, SubIdToState0 ),
259
286
287
+ log (" Deleting stream" ),
260
288
delete_stream (L # node .stream_port , S ),
261
289
262
290
% % online consumers should receive a metadata update frame (stream deleted)
263
291
% % we unqueue this frame before closing the connection
264
292
% % directly closing the connection of the cancelled consumer
265
293
maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
266
- {_ , C1 } = receive_commands (S0 , C0 ),
294
+ log (" Expecting frame in consumer ~p " , [K ]),
295
+ {Cmd1 , C1 } = receive_commands (S0 , C0 ),
296
+ log (" Received ~p " , [Cmd1 ]),
297
+ log (" Closing" ),
267
298
{ok , _ } = stream_test_utils :close (S0 , C1 );
268
- (_ , {S0 , C0 }) ->
299
+ (K , {S0 , C0 }) ->
300
+ log (" Closing ~p " , [K ]),
269
301
{ok , _ } = stream_test_utils :close (S0 , C0 )
270
302
end , SubIdToState1 ),
271
303
@@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
286
318
% % another node will be isolated
287
319
? assertEqual (L # node .name , CL ),
288
320
321
+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
322
+
289
323
{ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
290
324
{ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
291
325
{ok , So2 , C2_00 } = stream_test_utils :connect (F2 # node .stream_port ),
@@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
315
349
end , Consumers1 ),
316
350
# consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
317
351
352
+ log (" Isolating node ~p " , [Isolated ]),
353
+
318
354
rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
319
355
rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
320
356
321
357
wait_for_disconnected_consumer (Config , NotIsolated , Partition ),
322
358
wait_for_presumed_down_consumer (Config , NotIsolated , Partition ),
323
359
360
+ log (" Node ~p rejoins cluster" , [Isolated ]),
361
+
324
362
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
325
363
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
326
364
@@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
329
367
wait_for_all_consumers_connected (Config , NotIsolated , Partition ),
330
368
331
369
Consumers2 = query_consumers (Config , NotIsolated , Partition ),
370
+ log (" Consumers after partition resolution: ~p " , [Consumers2 ]),
371
+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
332
372
333
373
% % the disconnected, then presumed down consumer is cancelled,
334
374
% % because the stream member on its node has been restarted
@@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
340
380
341
381
SubIdToState1 =
342
382
maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
383
+ log (" Expecting metadata update for disconnected consumer" ),
343
384
% % cancelled consumer received a metadata update
344
385
C1 = receive_metadata_update (S0 , C0 ),
386
+ log (" Received metadata update" ),
345
387
Acc #{K => {S0 , C1 }};
346
388
(K , {S0 , C0 }, Acc ) ->
347
389
Acc #{K => {S0 , C0 }}
348
390
end , #{}, SubIdToState0 ),
349
391
392
+ log (" Deleting super stream" ),
350
393
delete_super_stream (L # node .stream_port , Ss ),
351
394
352
395
% % online consumers should receive a metadata update frame (stream deleted)
353
396
% % we unqueue this frame before closing the connection
354
397
% % directly closing the connection of the cancelled consumer
355
398
maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
356
- {_ , C1 } = receive_commands (S0 , C0 ),
399
+ log (" Expecting frame in consumer ~p " , [K ]),
400
+ {Cmd1 , C1 } = receive_commands (S0 , C0 ),
401
+ log (" Received ~p " , [Cmd1 ]),
402
+ log (" Closing" ),
357
403
{ok , _ } = stream_test_utils :close (S0 , C1 );
358
- (_ , {S0 , C0 }) ->
404
+ (K , {S0 , C0 }) ->
405
+ log (" Closing ~p " , [K ]),
359
406
{ok , _ } = stream_test_utils :close (S0 , C0 )
360
407
end , SubIdToState1 ),
361
408
ok .
@@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
374
421
% % the coordinator leader node will be isolated
375
422
? assertNotEqual (L # node .name , CL ),
376
423
424
+ log (" Stream leader and coordinator leader are on ~p " , [L # node .name ]),
425
+
377
426
{ok , So0 , C0_00 } = stream_test_utils :connect (L # node .stream_port ),
378
427
{ok , So1 , C1_00 } = stream_test_utils :connect (F1 # node .stream_port ),
379
428
{ok , So2 , C2_00 } = stream_test_utils :connect (F2 # node .stream_port ),
@@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
410
459
end , Consumers1 ),
411
460
# consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
412
461
462
+ log (" Isolating node ~p " , [Isolated ]),
463
+
413
464
rabbit_ct_broker_helpers :block_traffic_between (Isolated , LN ),
414
465
rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2N ),
415
466
416
467
wait_for_disconnected_consumer (Config , NotIsolated , Partition ),
417
468
wait_for_presumed_down_consumer (Config , NotIsolated , Partition ),
418
469
470
+ log (" Node ~p rejoins cluster" , [Isolated ]),
471
+
419
472
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , LN ),
420
473
rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2N ),
421
474
@@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
424
477
wait_for_all_consumers_connected (Config , NotIsolated , Partition ),
425
478
426
479
Consumers2 = query_consumers (Config , NotIsolated , Partition ),
480
+ log (" Consumers after partition resolution: ~p " , [Consumers2 ]),
481
+ log (" Disconnected consumer: ~p " , [DisconnectedConsumer ]),
427
482
428
483
% % the disconnected, then presumed down consumer is cancelled,
429
484
% % because the stream member on its node has been restarted
@@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
440
495
441
496
SubIdToState1 =
442
497
maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
498
+ log (" Expecting metadata update for disconnected consumer" ),
443
499
% % cancelled consumer received a metadata update
444
500
C1 = receive_metadata_update (S0 , C0 ),
501
+ log (" Received metadata update" ),
445
502
Acc #{K => {S0 , C1 }};
446
503
(K , {S0 , C0 }, Acc ) when K == ActiveSubId ->
504
+ log (" Expecting consumer update for promoted consumer" ),
447
505
% % promoted consumer should have received consumer update
448
506
C1 = receive_consumer_update_and_respond (S0 , C0 ),
507
+ log (" Received consumer update" ),
449
508
Acc #{K => {S0 , C1 }};
450
509
(K , {S0 , C0 }, Acc ) ->
451
510
Acc #{K => {S0 , C0 }}
452
511
end , #{}, SubIdToState0 ),
453
512
513
+ log (" Deleting super stream" ),
454
514
delete_super_stream (L # node .stream_port , Ss ),
455
515
456
516
% % online consumers should receive a metadata update frame (stream deleted)
457
517
% % we unqueue this frame before closing the connection
458
518
% % directly closing the connection of the cancelled consumer
459
519
maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
520
+ log (" Expecting frame in consumer ~p " , [K ]),
460
521
{Cmd1 , C1 } = receive_commands (S0 , C0 ),
461
- ct :pal (" Received command: ~p " , [Cmd1 ]),
522
+ log (" Received ~p " , [Cmd1 ]),
523
+ log (" Closing" ),
462
524
{ok , _ } = stream_test_utils :close (S0 , C1 );
463
- (_ , {S0 , C0 }) ->
525
+ (K , {S0 , C0 }) ->
526
+ log (" Closing ~p " , [K ]),
464
527
{ok , _ } = stream_test_utils :close (S0 , C0 )
465
528
end , SubIdToState1 ),
466
529
ok .
@@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) ->
727
790
rabbit_ct_helpers :await_condition (
728
791
fun () ->
729
792
Cs = query_consumers (Config , Node , Stream ),
793
+ log (" Expecting a disconnected consumer: ~p " , [Cs ]),
730
794
lists :any (fun (# consumer {status = {disconnected , _ }}) ->
731
795
true ;
732
796
(_ ) ->
@@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) ->
738
802
rabbit_ct_helpers :await_condition (
739
803
fun () ->
740
804
Cs = query_consumers (Config , Node , Stream ),
805
+ log (" Expecting a presumed-down consumer: ~p " , [Cs ]),
741
806
lists :any (fun (# consumer {status = {presumed_down , _ }}) ->
742
807
true ;
743
808
(_ ) ->
@@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) ->
749
814
rabbit_ct_helpers :await_condition (
750
815
fun () ->
751
816
Cs = query_consumers (Config , Node , Stream ),
817
+ log (" Expecting connected consumers: ~p " , [Cs ]),
752
818
lists :all (fun (# consumer {status = {connected , _ }}) ->
753
819
true ;
754
820
(_ ) ->
@@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) ->
761
827
rabbit_ct_helpers :await_condition (
762
828
fun () ->
763
829
Status = coordinator_status (Config ),
830
+ log (" Coordinator status: ~p " , [Status ]),
764
831
lists :all (fun (St ) ->
765
832
RS = proplists :get_value (<<" Raft State" >>, St ,
766
833
undefined ),
@@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) ->
785
852
786
853
assertEmpty (Data ) ->
787
854
assertSize (0 , Data ).
855
+
856
+ log (Format ) ->
857
+ ct :pal (Format ).
858
+
859
+ log (Format , Args ) ->
860
+ ct :pal (Format , Args ).
0 commit comments