Skip to content

Commit 7f50cc4

Browse files
committed
Add log in test
1 parent e019a4e commit 7f50cc4

File tree

1 file changed

+82
-9
lines changed

1 file changed

+82
-9
lines changed

deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
%%
1313
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
1414
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
15-
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
15+
%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
1616
%%
1717

1818
-module(rabbit_stream_partitions_SUITE).
@@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
107107
%% another node will be isolated
108108
?assertEqual(L#node.name, coordinator_leader(Config)),
109109

110+
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
111+
110112
{ok, So0, C0_00} = stream_test_utils:connect(Config, 0),
111113
{ok, So1, C1_00} = stream_test_utils:connect(Config, 1),
112114
{ok, So2, C2_00} = stream_test_utils:connect(Config, 2),
@@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
135137
end, Consumers1),
136138
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
137139

140+
log("Isolating node ~p", [Isolated]),
141+
138142
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
139143
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
140144

141145
wait_for_disconnected_consumer(Config, LN, S),
142146
wait_for_presumed_down_consumer(Config, LN, S),
143147

148+
log("Node ~p rejoins cluster", [Isolated]),
149+
144150
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
145151
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
146152

147153
wait_for_all_consumers_connected(Config, LN, S),
148154

149155
Consumers2 = query_consumers(Config, LN, S),
156+
log("Consumers after partition resolution: ~p", [Consumers2]),
157+
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
150158
%% the disconnected, then presumed down consumer is cancelled,
151159
%% because the stream member on its node has been restarted
152160
assertSize(2, Consumers2),
@@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
157165
%% assert the cancelled consumer received a metadata update frame
158166
SubIdToState1 =
159167
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
168+
log("Expecting metadata update for disconnected consumer"),
160169
C1 = receive_metadata_update(S0, C0),
170+
log("Received metadata update"),
161171
Acc#{K => {S0, C1}};
162172
(K, {S0, C0}, Acc) ->
163173
Acc#{K => {S0, C0}}
164174
end, #{}, SubIdToState0),
165175

176+
log("Deleting stream"),
166177
delete_stream(stream_port(Config, 0), S),
167178

168179
%% online consumers should receive a metadata update frame (stream deleted)
169180
%% we unqueue the this frame before closing the connection
170181
%% directly closing the connection of the cancelled consumer
171182
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
172-
{_, C1} = receive_commands(S0, C0),
183+
log("Expecting frame in consumer ~p", [K]),
184+
{Cmd1, C1} = receive_commands(S0, C0),
185+
log("Received ~p", [Cmd1]),
186+
log("Closing"),
173187
{ok, _} = stream_test_utils:close(S0, C1);
174-
(_, {S0, C0}) ->
188+
(K, {S0, C0}) ->
189+
log("Closing ~p", [K]),
175190
{ok, _} = stream_test_utils:close(S0, C0)
176191
end, SubIdToState1),
177192

@@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
190205
%% the coordinator leader node will be isolated
191206
?assertNotEqual(L#node.name, CL),
192207

208+
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
209+
193210
{ok, So0, C0_00} = stream_test_utils:connect(Config, CL),
194211
{ok, So1, C1_00} = stream_test_utils:connect(Config, CF1),
195212
{ok, So2, C2_00} = stream_test_utils:connect(Config, CF2),
@@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
216233
end, Consumers1),
217234
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
218235

236+
log("Isolating node ~p", [Isolated]),
237+
219238
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1),
220239
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2),
221240

222241
wait_for_disconnected_consumer(Config, NotIsolated, S),
223242
wait_for_presumed_down_consumer(Config, NotIsolated, S),
224243

244+
log("Node ~p rejoins cluster", [Isolated]),
245+
225246
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1),
226247
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2),
227248

@@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
231252

232253
Consumers2 = query_consumers(Config, NotIsolated, S),
233254

255+
log("Consumers after partition resolution ~p", [Consumers2]),
256+
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
234257
%% the disconnected, then presumed down consumer is cancelled,
235258
%% because the stream member on its node has been restarted
236259
assertSize(2, Consumers2),
@@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
246269

247270
SubIdToState1 =
248271
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
272+
log("Expecting metadata update for disconnected consumer"),
249273
%% cancelled consumer received a metadata update
250274
C1 = receive_metadata_update(S0, C0),
275+
log("Received metadata update"),
251276
Acc#{K => {S0, C1}};
252277
(K, {S0, C0}, Acc) when K == ActiveSubId ->
278+
log("Expecting consumer update for promoted consumer"),
253279
%% promoted consumer should have received consumer update
254280
C1 = receive_consumer_update_and_respond(S0, C0),
281+
log("Received consumer update"),
255282
Acc#{K => {S0, C1}};
256283
(K, {S0, C0}, Acc) ->
257284
Acc#{K => {S0, C0}}
258285
end, #{}, SubIdToState0),
259286

287+
log("Deleting stream"),
260288
delete_stream(L#node.stream_port, S),
261289

262290
%% online consumers should receive a metadata update frame (stream deleted)
263291
%% we unqueue this frame before closing the connection
264292
%% directly closing the connection of the cancelled consumer
265293
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
266-
{_, C1} = receive_commands(S0, C0),
294+
log("Expecting frame in consumer ~p", [K]),
295+
{Cmd1, C1} = receive_commands(S0, C0),
296+
log("Received ~p", [Cmd1]),
297+
log("Closing"),
267298
{ok, _} = stream_test_utils:close(S0, C1);
268-
(_, {S0, C0}) ->
299+
(K, {S0, C0}) ->
300+
log("Closing ~p", [K]),
269301
{ok, _} = stream_test_utils:close(S0, C0)
270302
end, SubIdToState1),
271303

@@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
286318
%% another node will be isolated
287319
?assertEqual(L#node.name, CL),
288320

321+
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
322+
289323
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
290324
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
291325
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
@@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
315349
end, Consumers1),
316350
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
317351

352+
log("Isolating node ~p", [Isolated]),
353+
318354
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
319355
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
320356

321357
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
322358
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
323359

360+
log("Node ~p rejoins cluster", [Isolated]),
361+
324362
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
325363
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
326364

@@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
329367
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
330368

331369
Consumers2 = query_consumers(Config, NotIsolated, Partition),
370+
log("Consumers after partition resolution: ~p", [Consumers2]),
371+
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
332372

333373
%% the disconnected, then presumed down consumer is cancelled,
334374
%% because the stream member on its node has been restarted
@@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
340380

341381
SubIdToState1 =
342382
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
383+
log("Expecting metadata update for disconnected consumer"),
343384
%% cancelled consumer received a metadata update
344385
C1 = receive_metadata_update(S0, C0),
386+
log("Received metadata update"),
345387
Acc#{K => {S0, C1}};
346388
(K, {S0, C0}, Acc) ->
347389
Acc#{K => {S0, C0}}
348390
end, #{}, SubIdToState0),
349391

392+
log("Deleting super stream"),
350393
delete_super_stream(L#node.stream_port, Ss),
351394

352395
%% online consumers should receive a metadata update frame (stream deleted)
353396
%% we unqueue this frame before closing the connection
354397
%% directly closing the connection of the cancelled consumer
355398
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
356-
{_, C1} = receive_commands(S0, C0),
399+
log("Expecting frame in consumer ~p", [K]),
400+
{Cmd1, C1} = receive_commands(S0, C0),
401+
log("Received ~p", [Cmd1]),
402+
log("Closing"),
357403
{ok, _} = stream_test_utils:close(S0, C1);
358-
(_, {S0, C0}) ->
404+
(K, {S0, C0}) ->
405+
log("Closing ~p", [K]),
359406
{ok, _} = stream_test_utils:close(S0, C0)
360407
end, SubIdToState1),
361408
ok.
@@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
374421
%% the coordinator leader node will be isolated
375422
?assertNotEqual(L#node.name, CL),
376423

424+
log("Stream leader and coordinator leader are on ~p", [L#node.name]),
425+
377426
{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
378427
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
379428
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
@@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
410459
end, Consumers1),
411460
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,
412461

462+
log("Isolating node ~p", [Isolated]),
463+
413464
rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
414465
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),
415466

416467
wait_for_disconnected_consumer(Config, NotIsolated, Partition),
417468
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),
418469

470+
log("Node ~p rejoins cluster", [Isolated]),
471+
419472
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
420473
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),
421474

@@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
424477
wait_for_all_consumers_connected(Config, NotIsolated, Partition),
425478

426479
Consumers2 = query_consumers(Config, NotIsolated, Partition),
480+
log("Consumers after partition resolution: ~p", [Consumers2]),
481+
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
427482

428483
%% the disconnected, then presumed down consumer is cancelled,
429484
%% because the stream member on its node has been restarted
@@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
440495

441496
SubIdToState1 =
442497
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
498+
log("Expecting metadata update for disconnected consumer"),
443499
%% cancelled consumer received a metadata update
444500
C1 = receive_metadata_update(S0, C0),
501+
log("Received metadata update"),
445502
Acc#{K => {S0, C1}};
446503
(K, {S0, C0}, Acc) when K == ActiveSubId ->
504+
log("Expecting consumer update for promoted consumer"),
447505
%% promoted consumer should have received consumer update
448506
C1 = receive_consumer_update_and_respond(S0, C0),
507+
log("Received consumer update"),
449508
Acc#{K => {S0, C1}};
450509
(K, {S0, C0}, Acc) ->
451510
Acc#{K => {S0, C0}}
452511
end, #{}, SubIdToState0),
453512

513+
log("Deleting super stream"),
454514
delete_super_stream(L#node.stream_port, Ss),
455515

456516
%% online consumers should receive a metadata update frame (stream deleted)
457517
%% we unqueue this frame before closing the connection
458518
%% directly closing the connection of the cancelled consumer
459519
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
520+
log("Expecting frame in consumer ~p", [K]),
460521
{Cmd1, C1} = receive_commands(S0, C0),
461-
ct:pal("Received command: ~p", [Cmd1]),
522+
log("Received ~p", [Cmd1]),
523+
log("Closing"),
462524
{ok, _} = stream_test_utils:close(S0, C1);
463-
(_, {S0, C0}) ->
525+
(K, {S0, C0}) ->
526+
log("Closing ~p", [K]),
464527
{ok, _} = stream_test_utils:close(S0, C0)
465528
end, SubIdToState1),
466529
ok.
@@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) ->
727790
rabbit_ct_helpers:await_condition(
728791
fun() ->
729792
Cs = query_consumers(Config, Node, Stream),
793+
log("Expecting a disconnected consumer: ~p", [Cs]),
730794
lists:any(fun(#consumer{status = {disconnected, _}}) ->
731795
true;
732796
(_) ->
@@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) ->
738802
rabbit_ct_helpers:await_condition(
739803
fun() ->
740804
Cs = query_consumers(Config, Node, Stream),
805+
log("Expecting a presumed-down consumer: ~p", [Cs]),
741806
lists:any(fun(#consumer{status = {presumed_down, _}}) ->
742807
true;
743808
(_) ->
@@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) ->
749814
rabbit_ct_helpers:await_condition(
750815
fun() ->
751816
Cs = query_consumers(Config, Node, Stream),
817+
log("Expecting connected consumers: ~p", [Cs]),
752818
lists:all(fun(#consumer{status = {connected, _}}) ->
753819
true;
754820
(_) ->
@@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) ->
761827
rabbit_ct_helpers:await_condition(
762828
fun() ->
763829
Status = coordinator_status(Config),
830+
log("Coordinator status: ~p", [Status]),
764831
lists:all(fun(St) ->
765832
RS = proplists:get_value(<<"Raft State">>, St,
766833
undefined),
@@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) ->
785852

786853
assertEmpty(Data) ->
787854
assertSize(0, Data).
855+
856+
log(Format) ->
857+
ct:pal(Format).
858+
859+
log(Format, Args) ->
860+
ct:pal(Format, Args).

0 commit comments

Comments
 (0)