Skip to content

Add log in test #14115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 82 additions & 9 deletions deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_stream_partitions_SUITE).
Expand Down Expand Up @@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
%% another node will be isolated
?assertEqual(L#node.name, coordinator_leader(Config)),

log("Stream leader and coordinator leader are on ~p", [L#node.name]),

{ok, So0, C0_00} = stream_test_utils:connect(Config, 0),
{ok, So1, C1_00} = stream_test_utils:connect(Config, 1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, 2),
Expand Down Expand Up @@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,

log("Isolating node ~p", [Isolated]),

rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),

wait_for_disconnected_consumer(Config, LN, S),
wait_for_presumed_down_consumer(Config, LN, S),

log("Node ~p rejoins cluster", [Isolated]),

rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),

wait_for_all_consumers_connected(Config, LN, S),

Consumers2 = query_consumers(Config, LN, S),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
Expand All @@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
%% assert the cancelled consumer received a metadata update frame
SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),

log("Deleting stream"),
delete_stream(stream_port(Config, 0), S),

%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue the this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),

Expand All @@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),

log("Stream leader and coordinator leader are on ~p", [L#node.name]),

{ok, So0, C0_00} = stream_test_utils:connect(Config, CL),
{ok, So1, C1_00} = stream_test_utils:connect(Config, CF1),
{ok, So2, C2_00} = stream_test_utils:connect(Config, CF2),
Expand All @@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,

log("Isolating node ~p", [Isolated]),

rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2),

wait_for_disconnected_consumer(Config, NotIsolated, S),
wait_for_presumed_down_consumer(Config, NotIsolated, S),

log("Node ~p rejoins cluster", [Isolated]),

rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2),

Expand All @@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co

Consumers2 = query_consumers(Config, NotIsolated, S),

log("Consumers after partition resolution ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),
%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
assertSize(2, Consumers2),
Expand All @@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co

SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),

log("Deleting stream"),
delete_stream(L#node.stream_port, S),

%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),

Expand All @@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
%% another node will be isolated
?assertEqual(L#node.name, CL),

log("Stream leader and coordinator leader are on ~p", [L#node.name]),

{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
Expand Down Expand Up @@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,

log("Isolating node ~p", [Isolated]),

rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),

wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),

log("Node ~p rejoins cluster", [Isolated]),

rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),

Expand All @@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
wait_for_all_consumers_connected(Config, NotIsolated, Partition),

Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),

%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
Expand All @@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -

SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),

log("Deleting super stream"),
delete_super_stream(L#node.stream_port, Ss),

%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
{_, C1} = receive_commands(S0, C0),
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
Expand All @@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
%% the coordinator leader node will be isolated
?assertNotEqual(L#node.name, CL),

log("Stream leader and coordinator leader are on ~p", [L#node.name]),

{ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port),
{ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port),
{ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port),
Expand Down Expand Up @@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
end, Consumers1),
#consumer{subscription_id = DiscSubId} = DisconnectedConsumer,

log("Isolating node ~p", [Isolated]),

rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N),

wait_for_disconnected_consumer(Config, NotIsolated, Partition),
wait_for_presumed_down_consumer(Config, NotIsolated, Partition),

log("Node ~p rejoins cluster", [Isolated]),

rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN),
rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N),

Expand All @@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
wait_for_all_consumers_connected(Config, NotIsolated, Partition),

Consumers2 = query_consumers(Config, NotIsolated, Partition),
log("Consumers after partition resolution: ~p", [Consumers2]),
log("Disconnected consumer: ~p", [DisconnectedConsumer]),

%% the disconnected, then presumed down consumer is cancelled,
%% because the stream member on its node has been restarted
Expand All @@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit

SubIdToState1 =
maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId ->
log("Expecting metadata update for disconnected consumer"),
%% cancelled consumer received a metadata update
C1 = receive_metadata_update(S0, C0),
log("Received metadata update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) when K == ActiveSubId ->
log("Expecting consumer update for promoted consumer"),
%% promoted consumer should have received consumer update
C1 = receive_consumer_update_and_respond(S0, C0),
log("Received consumer update"),
Acc#{K => {S0, C1}};
(K, {S0, C0}, Acc) ->
Acc#{K => {S0, C0}}
end, #{}, SubIdToState0),

log("Deleting super stream"),
delete_super_stream(L#node.stream_port, Ss),

%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
ct:pal("Received command: ~p", [Cmd1]),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
(_, {S0, C0}) ->
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
end, SubIdToState1),
ok.
Expand Down Expand Up @@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a disconnected consumer: ~p", [Cs]),
lists:any(fun(#consumer{status = {disconnected, _}}) ->
true;
(_) ->
Expand All @@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting a presumed-down consumer: ~p", [Cs]),
lists:any(fun(#consumer{status = {presumed_down, _}}) ->
true;
(_) ->
Expand All @@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) ->
rabbit_ct_helpers:await_condition(
fun() ->
Cs = query_consumers(Config, Node, Stream),
log("Expecting connected consumers: ~p", [Cs]),
lists:all(fun(#consumer{status = {connected, _}}) ->
true;
(_) ->
Expand All @@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) ->
rabbit_ct_helpers:await_condition(
fun() ->
Status = coordinator_status(Config),
log("Coordinator status: ~p", [Status]),
lists:all(fun(St) ->
RS = proplists:get_value(<<"Raft State">>, St,
undefined),
Expand All @@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) ->

assertEmpty(Data) ->
assertSize(0, Data).

log(Format) ->
ct:pal(Format).

log(Format, Args) ->
ct:pal(Format, Args).
Loading