Skip to content

Commit e21007f

Browse files
committed
Pipeline management requests
1 parent 9bb622c commit e21007f

File tree

2 files changed

+188
-18
lines changed

2 files changed

+188
-18
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
%% An even better approach in future would be to dynamically grow (or shrink) the link credit
4949
%% we grant depending on how fast target queue(s) actually confirm messages.
5050
-define(LINK_CREDIT_RCV, 128).
51+
-define(MANAGEMENT_LINK_CREDIT_RCV, 8).
5152
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).
5253

5354
-export([start_link/8,
@@ -90,8 +91,6 @@
9091
-record(management_link, {
9192
name :: binary(),
9293
delivery_count :: sequence_no(),
93-
%% Credit on an incoming management link is always 1 since management
94-
%% requests are synchronous and we grant 1 credit when sending a response.
9594
credit :: non_neg_integer(),
9695
max_message_size :: unlimited | pos_integer()
9796
}).
@@ -767,11 +766,10 @@ handle_control(#'v1_0.attach'{
767766
outgoing_half = unattached},
768767
Pairs0)
769768
end,
770-
Credit = 1,
771769
MaxMessageSize = persistent_term:get(max_message_size),
772770
Link = #management_link{name = LinkName,
773771
delivery_count = DeliveryCountInt,
774-
credit = Credit,
772+
credit = ?MANAGEMENT_LINK_CREDIT_RCV,
775773
max_message_size = MaxMessageSize},
776774
State = State0#state{management_link_pairs = Pairs,
777775
incoming_management_links = maps:put(HandleInt, Link, Links)},
@@ -788,7 +786,7 @@ handle_control(#'v1_0.attach'{
788786
properties = Properties},
789787
Flow = #'v1_0.flow'{handle = Handle,
790788
delivery_count = DeliveryCount,
791-
link_credit = ?UINT(Credit)},
789+
link_credit = ?UINT(?MANAGEMENT_LINK_CREDIT_RCV)},
792790
reply0([Reply, Flow], State);
793791

794792
handle_control(#'v1_0.attach'{
@@ -1649,22 +1647,33 @@ incoming_mgmt_link_transfer(
16491647
{ok, Link} ->
16501648
Link;
16511649
error ->
1652-
protocol_error(?V_1_0_AMQP_ERROR_ILLEGAL_STATE,
1653-
"Unknown link handle: ~p", [IncomingHandleInt])
1650+
protocol_error(
1651+
?V_1_0_AMQP_ERROR_ILLEGAL_STATE,
1652+
"Unknown link handle: ~p", [IncomingHandleInt])
16541653
end,
16551654
#management_link{name = Name,
16561655
delivery_count = IncomingDeliveryCount0,
1657-
credit = 1,
1656+
credit = IncomingCredit0,
16581657
max_message_size = IncomingMaxMessageSize
16591658
} = IncomingLink0,
1659+
case IncomingCredit0 > 0 of
1660+
true ->
1661+
ok;
1662+
false ->
1663+
protocol_error(
1664+
?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED,
1665+
"insufficient credit (~b) for management link from client to RabbitMQ",
1666+
[IncomingCredit0])
1667+
end,
16601668
#management_link_pair{
16611669
incoming_half = IncomingHandleInt,
16621670
outgoing_half = OutgoingHandleInt
16631671
} = maps:get(Name, LinkPairs),
16641672
OutgoingLink0 = case OutgoingHandleInt of
16651673
unattached ->
1666-
protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1667-
"received transfer on half open management link pair", []);
1674+
protocol_error(
1675+
?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1676+
"received transfer on half open management link pair", []);
16681677
_ ->
16691678
maps:get(OutgoingHandleInt, OutgoingLinks)
16701679
end,
@@ -1676,9 +1685,10 @@ incoming_mgmt_link_transfer(
16761685
true ->
16771686
ok;
16781687
false ->
1679-
protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1680-
"insufficient credit (~b) for management link from server to client",
1681-
[OutgoingCredit])
1688+
protocol_error(
1689+
?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1690+
"insufficient credit (~b) for management link from RabbitMQ to client",
1691+
[OutgoingCredit])
16821692
end,
16831693
Settled = default(MaybeSettled, false),
16841694
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
@@ -1698,17 +1708,19 @@ incoming_mgmt_link_transfer(
16981708
Frames = transfer_frames(Transfer, Response, MaxFrameSize),
16991709
PendingTransfer = #pending_management_transfer{frames = Frames},
17001710
IncomingDeliveryCount = add(IncomingDeliveryCount0, 1),
1701-
IncomingLink = IncomingLink0#management_link{delivery_count = IncomingDeliveryCount},
1711+
IncomingCredit1 = IncomingCredit0 - 1,
1712+
{IncomingCredit, Reply} = maybe_grant_mgmt_link_credit(
1713+
IncomingCredit1, IncomingDeliveryCount, IncomingHandle),
1714+
IncomingLink = IncomingLink0#management_link{delivery_count = IncomingDeliveryCount,
1715+
credit = IncomingCredit},
17021716
OutgoingLink = OutgoingLink0#management_link{delivery_count = add(OutgoingDeliveryCount, 1),
17031717
credit = OutgoingCredit - 1},
17041718
State = State0#state{
17051719
outgoing_delivery_id = add(OutgoingDeliveryId, 1),
17061720
outgoing_pending = queue:in(PendingTransfer, Pending),
17071721
incoming_management_links = maps:update(IncomingHandleInt, IncomingLink, IncomingLinks),
17081722
outgoing_management_links = maps:update(OutgoingHandleInt, OutgoingLink, OutgoingLinks)},
1709-
1710-
Flow = flow(IncomingHandle, IncomingDeliveryCount, 1),
1711-
{[Flow], State}.
1723+
{Reply, State}.
17121724

17131725
incoming_link_transfer(
17141726
#'v1_0.transfer'{more = true,
@@ -1908,6 +1920,13 @@ grant_link_credit(Credit, NumUnconfirmed) ->
19081920
Credit =< ?LINK_CREDIT_RCV / 2 andalso
19091921
NumUnconfirmed < ?LINK_CREDIT_RCV.
19101922

1923+
maybe_grant_mgmt_link_credit(Credit, DeliveryCount, Handle)
1924+
when Credit =< ?MANAGEMENT_LINK_CREDIT_RCV / 2 ->
1925+
{?MANAGEMENT_LINK_CREDIT_RCV,
1926+
[flow(Handle, DeliveryCount, ?MANAGEMENT_LINK_CREDIT_RCV)]};
1927+
maybe_grant_mgmt_link_credit(Credit, _, _) ->
1928+
{Credit, []}.
1929+
19111930
%% TODO default-outcome and outcomes, dynamic lifetimes
19121931
ensure_target(#'v1_0.target'{dynamic = true}, _, _) ->
19131932
protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED,
@@ -2660,6 +2679,9 @@ format_status(
26602679
outgoing_delivery_id = OutgoingDeliveryId,
26612680
incoming_links = IncomingLinks,
26622681
outgoing_links = OutgoingLinks,
2682+
management_link_pairs = ManagementLinks,
2683+
incoming_management_links = IncomingManagementLinks,
2684+
outgoing_management_links = OutgoingManagementLinks,
26632685
outgoing_unsettled_map = OutgoingUnsettledMap,
26642686
stashed_rejected = StashedRejected,
26652687
stashed_settled = StashedSettled,
@@ -2676,6 +2698,9 @@ format_status(
26762698
outgoing_delivery_id => OutgoingDeliveryId,
26772699
incoming_links => IncomingLinks,
26782700
outgoing_links => OutgoingLinks,
2701+
management_link_pairs => ManagementLinks,
2702+
incoming_management_links => IncomingManagementLinks,
2703+
outgoing_management_links => OutgoingManagementLinks,
26792704
outgoing_unsettled_map => OutgoingUnsettledMap,
26802705
stashed_rejected => StashedRejected,
26812706
stashed_settled => StashedSettled,

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ groups() ->
7272
bind_destination_line_feed,
7373
bind_missing_queue,
7474
exclusive_queue,
75-
purge_stream
75+
purge_stream,
76+
pipeline,
77+
multiple_link_pairs,
78+
link_attach_order
7679
]},
7780
{cluster_size_3, [shuffle],
7881
[classic_queue_stopped,
@@ -786,6 +789,135 @@ queue_topology(Config) ->
786789
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, SQName),
787790
ok = cleanup(Init2).
788791

792+
%% Even though RabbitMQ processes management requests synchronously (one at a time),
793+
%% the client should be able to send multiple requests at once before receiving a response.
794+
pipeline(Config) ->
795+
Init = {_, _, LinkPair} = init(Config),
796+
flush(attached),
797+
798+
%% We should be able to send 8 management requests at once
799+
%% because RabbitMQ grants us 8 link credits initially.
800+
Num = 8,
801+
pipeline0(Num, LinkPair, <<"PUT">>, {map, []}),
802+
eventually(?_assertEqual(Num, rpc(Config, rabbit_amqqueue, count, [])), 200, 20),
803+
flush(queues_created),
804+
805+
pipeline0(Num, LinkPair, <<"DELETE">>, null),
806+
eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])), 200, 20),
807+
flush(queues_deleted),
808+
809+
ok = cleanup(Init).
810+
811+
pipeline0(Num,
812+
#link_pair{outgoing_link = OutgoingLink,
813+
incoming_link = IncomingLink},
814+
HttpMethod,
815+
Body) ->
816+
ok = amqp10_client:flow_link_credit(IncomingLink, Num, never),
817+
[begin
818+
Request0 = amqp10_msg:new(<<>>, #'v1_0.amqp_value'{content = Body}, true),
819+
Bin = integer_to_binary(N),
820+
Props = #{subject => HttpMethod,
821+
to => <<"/queues/q-", Bin/binary>>,
822+
message_id => {binary, Bin},
823+
reply_to => <<"$me">>},
824+
Request = amqp10_msg:set_properties(Props, Request0),
825+
ok = amqp10_client:send_msg(OutgoingLink, Request)
826+
end || N <- lists:seq(1, Num)].
827+
828+
%% RabbitMQ allows attaching multiple link pairs.
829+
multiple_link_pairs(Config) ->
830+
OpnConf = connection_config(Config),
831+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
832+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
833+
{ok, LinkPair1} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"link pair 1">>),
834+
{ok, LinkPair2} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"link pair 2">>),
835+
836+
[SessionPid] = rpc(Config, rabbit_amqp_session, list_local, []),
837+
#{management_link_pairs := Pairs0,
838+
incoming_management_links := Incoming0,
839+
outgoing_management_links := Outgoing0} = gen_server_state(SessionPid),
840+
?assertEqual(2, maps:size(Pairs0)),
841+
?assertEqual(2, maps:size(Incoming0)),
842+
?assertEqual(2, maps:size(Outgoing0)),
843+
844+
QName = <<"q">>,
845+
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair1, QName, #{}),
846+
{ok, #{}} = rabbitmq_amqp_client:delete_queue(LinkPair2, QName),
847+
848+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair1),
849+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair2),
850+
851+
%% Assert that the server cleaned up its state.
852+
#{management_link_pairs := Pairs,
853+
incoming_management_links := Incoming,
854+
outgoing_management_links := Outgoing} = gen_server_state(SessionPid),
855+
?assertEqual(0, maps:size(Pairs)),
856+
?assertEqual(0, maps:size(Incoming)),
857+
?assertEqual(0, maps:size(Outgoing)),
858+
859+
ok = amqp10_client:end_session(Session),
860+
ok = amqp10_client:close_connection(Connection).
861+
862+
%% Attaching (and detaching) either the sender or the receiver link first should both work.
863+
link_attach_order(Config) ->
864+
PairName1 = <<"link pair 1">>,
865+
PairName2 = <<"link pair 2">>,
866+
867+
OpnConf = connection_config(Config),
868+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
869+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
870+
871+
Terminus = #{address => <<"/management">>,
872+
durable => none},
873+
OutgoingAttachArgs1 = #{name => PairName1,
874+
role => {sender, Terminus},
875+
snd_settle_mode => settled,
876+
rcv_settle_mode => first,
877+
properties => #{<<"paired">> => true}},
878+
IncomingAttachArgs1 = OutgoingAttachArgs1#{role := {receiver, Terminus, self()},
879+
filter => #{}},
880+
OutgoingAttachArgs2 = OutgoingAttachArgs1#{name := PairName2},
881+
IncomingAttachArgs2 = IncomingAttachArgs1#{name := PairName2},
882+
883+
%% Attach sender before receiver.
884+
{ok, OutgoingRef1} = amqp10_client:attach_link(Session, OutgoingAttachArgs1),
885+
{ok, IncomingRef1} = amqp10_client:attach_link(Session, IncomingAttachArgs1),
886+
%% Attach receiver before sender.
887+
{ok, IncomingRef2} = amqp10_client:attach_link(Session, IncomingAttachArgs2),
888+
{ok, OutgoingRef2} = amqp10_client:attach_link(Session, OutgoingAttachArgs2),
889+
890+
Refs = [OutgoingRef1,
891+
OutgoingRef2,
892+
IncomingRef1,
893+
IncomingRef2],
894+
895+
[ok = wait_for_event(Ref, attached) || Ref <- Refs],
896+
flush(attached),
897+
898+
LinkPair1 = #link_pair{session = Session,
899+
outgoing_link = OutgoingRef1,
900+
incoming_link = IncomingRef1},
901+
LinkPair2 = #link_pair{session = Session,
902+
outgoing_link = OutgoingRef2,
903+
incoming_link = IncomingRef2},
904+
905+
QName = <<"test queue">>,
906+
{ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair1, QName, #{}),
907+
{ok, #{}} = rabbitmq_amqp_client:delete_queue(LinkPair2, QName),
908+
909+
%% Detach sender before receiver.
910+
ok = amqp10_client:detach_link(OutgoingRef1),
911+
ok = amqp10_client:detach_link(IncomingRef1),
912+
%% Detach receiver before sender.
913+
ok = amqp10_client:detach_link(IncomingRef2),
914+
ok = amqp10_client:detach_link(OutgoingRef2),
915+
916+
[ok = wait_for_event(Ref, {detached, normal}) || Ref <- Refs],
917+
flush(detached),
918+
ok = amqp10_client:end_session(Session),
919+
ok = amqp10_client:close_connection(Connection).
920+
789921
init(Config) ->
790922
init(Config, 0).
791923

@@ -846,3 +978,16 @@ wait_for_settlement(Tag, State) ->
846978
flush(Reason),
847979
ct:fail(Reason)
848980
end.
981+
982+
wait_for_event(Ref, Event) ->
983+
receive {amqp10_event, {link, Ref, Event}} -> ok
984+
after 5000 -> ct:fail({missing_event, Ref, Event})
985+
end.
986+
987+
%% Return the formatted state of a gen_server via sys:get_status/1.
988+
%% (sys:get_state/1 is unformatted)
989+
gen_server_state(Pid) ->
990+
{status, _, _, L0} = sys:get_status(Pid, 20_000),
991+
L1 = lists:last(L0),
992+
{data, L2} = lists:last(L1),
993+
proplists:get_value("State", L2).

0 commit comments

Comments
 (0)