Skip to content

Enable AMQP 1.0 clients to manage topologies #10559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
!/deps/amqp10_common/
!/deps/oauth2_client/
!/deps/rabbitmq_amqp1_0/
!/deps/rabbitmq_amqp_client/
!/deps/rabbitmq_auth_backend_cache/
!/deps/rabbitmq_auth_backend_http/
!/deps/rabbitmq_auth_backend_ldap/
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ load(

APP_NAME = "amqp10_client"

APP_DESCRIPTION = "AMQP 1.0 client from the RabbitMQ Project"
APP_DESCRIPTION = "AMQP 1.0 client"

APP_MODULE = "amqp10_client_app"

Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
PROJECT = amqp10_client
PROJECT_DESCRIPTION = AMQP 1.0 client from the RabbitMQ Project
PROJECT_DESCRIPTION = AMQP 1.0 client
PROJECT_MOD = amqp10_client_app

define PROJECT_APP_EXTRA_KEYS
Expand Down
10 changes: 4 additions & 6 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
parse_uri/1
]).

-define(DEFAULT_TIMEOUT, 5000).

-type snd_settle_mode() :: amqp10_client_session:snd_settle_mode().
-type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode().

Expand Down Expand Up @@ -134,7 +132,7 @@ begin_session(Connection) when is_pid(Connection) ->
-spec begin_session_sync(pid()) ->
supervisor:startchild_ret() | session_timeout.
begin_session_sync(Connection) when is_pid(Connection) ->
begin_session_sync(Connection, ?DEFAULT_TIMEOUT).
begin_session_sync(Connection, ?TIMEOUT).

%% @doc Synchronously begins an amqp10 session using 'Connection'.
%% This is a convenience function that awaits the 'begun' event
Expand Down Expand Up @@ -191,7 +189,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
{ok, Ref};
{amqp10_event, {link, Ref, {detached, Err}}} ->
{error, Err}
after ?DEFAULT_TIMEOUT -> link_timeout
after ?TIMEOUT -> link_timeout
end.

%% @doc Attaches a sender link to a target.
Expand Down Expand Up @@ -357,7 +355,7 @@ stop_receiver_link(#link_ref{role = receiver,
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT).
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).

%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
Expand All @@ -376,7 +374,7 @@ settle_msg(LinkRef, Msg, Settlement) ->
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
get_msg(LinkRef) ->
get_msg(LinkRef, ?DEFAULT_TIMEOUT).
get_msg(LinkRef, ?TIMEOUT).

%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
Expand Down
74 changes: 43 additions & 31 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
diff/2]).

-define(MAX_SESSION_WINDOW_SIZE, 65535).
-define(DEFAULT_TIMEOUT, 5000).
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
Expand Down Expand Up @@ -149,7 +148,7 @@
reader :: pid(),
socket :: amqp10_client_connection:amqp10_socket() | undefined,
links = #{} :: #{output_handle() => #link{}},
link_index = #{} :: #{link_name() => output_handle()},
link_index = #{} :: #{{link_role(), link_name()} => output_handle()},
link_handle_index = #{} :: #{input_handle() => output_handle()},
next_link_handle = 0 :: output_handle(),
early_attach_requests :: [term()],
Expand All @@ -172,7 +171,7 @@

-spec begin_sync(pid()) -> supervisor:startchild_ret().
begin_sync(Connection) ->
begin_sync(Connection, ?DEFAULT_TIMEOUT).
begin_sync(Connection, ?TIMEOUT).

-spec begin_sync(pid(), non_neg_integer()) ->
supervisor:startchild_ret() | session_timeout.
Expand Down Expand Up @@ -302,33 +301,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
initial_delivery_count = IDC,
handle = {uint, InHandle},
role = PeerRoleBool,
max_message_size = MaybeMaxMessageSize},
#state{links = Links, link_index = LinkIndex,
link_handle_index = LHI} = State0) ->

#{Name := OutHandle} = LinkIndex,
OurRoleBool = not PeerRoleBool,
OurRole = boolean_to_role(OurRoleBool),
LinkIndexKey = {OurRole, Name},
#{LinkIndexKey := OutHandle} = LinkIndex,
#{OutHandle := Link0} = Links,
ok = notify_link_attached(Link0),

{DeliveryCount, MaxMessageSize} =
case Link0 of
#link{role = sender,
#link{role = sender = OurRole,
delivery_count = DC} ->
MSS = case MaybeMaxMessageSize of
{ulong, S} when S > 0 -> S;
_ -> undefined
end,
{DC, MSS};
#link{role = receiver,
#link{role = receiver = OurRole,
max_message_size = MSS} ->
{unpack(IDC), MSS}
end,
Link = Link0#link{state = attached,
input_handle = InHandle,
delivery_count = DeliveryCount,
max_message_size = MaxMessageSize},
State = State0#state{links = Links#{OutHandle => Link},
link_index = maps:remove(Name, LinkIndex),
State = State0#state{links = Links#{OutHandle := Link},
link_index = maps:remove(LinkIndexKey, LinkIndex),
link_handle_index = LHI#{InHandle => OutHandle}},
{keep_state, State};
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
Expand Down Expand Up @@ -648,8 +651,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->

make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
make_source(#{role := {receiver, #{address := Address} = Target, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
TranslatedFilter = translate_filters(Filter),
#'v1_0.source'{address = {utf8, Address},
durable = {uint, Durable},
Expand Down Expand Up @@ -743,35 +746,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
ok = send(Detach, State),
Link#link{state = detach_sent}.

send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
#state{next_link_handle = OutHandle0, links = Links,
send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
#state{next_link_handle = OutHandle0, links = Links,
link_index = LinkIndex} = State) ->

Source = make_source(Args),
Target = make_target(Args),
Properties = amqp10_client_types:make_properties(Args),

{LinkTarget, RoleAsBool, InitialDeliveryCount, MaxMessageSize} =
case Role of
{LinkTarget, InitialDeliveryCount, MaxMessageSize} =
case RoleTuple of
{receiver, _, Pid} ->
{{pid, Pid}, true, undefined, max_message_size(Args)};
{{pid, Pid}, undefined, max_message_size(Args)};
{sender, #{address := TargetAddr}} ->
{TargetAddr, false, uint(?INITIAL_DELIVERY_COUNT), undefined}
end,

{OutHandle, NextLinkHandle} =
case Args of
#{handle := Handle} ->
%% Client app provided link handle.
%% Really only meant for integration tests.
{Handle, OutHandle0};
_ ->
{OutHandle0, OutHandle0 + 1}
{TargetAddr, uint(?INITIAL_DELIVERY_COUNT), undefined}
end,

{OutHandle, NextLinkHandle} = case Args of
#{handle := Handle} ->
%% Client app provided link handle.
%% Really only meant for integration tests.
{Handle, OutHandle0};
_ ->
{OutHandle0, OutHandle0 + 1}
end,
Role = element(1, RoleTuple),
% create attach performative
Attach = #'v1_0.attach'{name = {utf8, Name},
role = RoleAsBool,
role = role_to_boolean(Role),
handle = {uint, OutHandle},
source = Source,
properties = Properties,
Expand All @@ -782,12 +784,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
max_message_size = MaxMessageSize},
ok = Send(Attach, State),

LinkRef = make_link_ref(element(1, Role), self(), OutHandle),
Ref = make_link_ref(Role, self(), OutHandle),
Link = #link{name = Name,
ref = LinkRef,
ref = Ref,
output_handle = OutHandle,
state = attach_sent,
role = element(1, Role),
role = Role,
notify = FromPid,
auto_flow = never,
target = LinkTarget,
Expand All @@ -796,7 +798,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},

{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
link_index = LinkIndex#{Name => OutHandle}}, LinkRef}.
link_index = LinkIndex#{{Role, Name} => OutHandle}}, Ref}.

-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
Expand Down Expand Up @@ -1090,6 +1092,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
reason(undefined) -> normal;
reason(Other) -> Other.

role_to_boolean(sender) ->
?AMQP_ROLE_SENDER;
role_to_boolean(receiver) ->
?AMQP_ROLE_RECEIVER.

boolean_to_role(?AMQP_ROLE_SENDER) ->
sender;
boolean_to_role(?AMQP_ROLE_RECEIVER) ->
receiver.

format_status(Status = #{data := Data0}) ->
#state{channel = Channel,
remote_channel = RemoteChannel,
Expand Down
3 changes: 1 addition & 2 deletions deps/amqp10_client/src/amqp10_client_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
link_event_detail()}.
-type amqp10_event() :: {amqp10_event, amqp10_event_detail()}.

-type properties() :: #{binary() => tuple()}.
-type properties() :: #{binary() => amqp10_binary_generator:amqp10_prim()}.

-export_type([amqp10_performative/0, channel/0,
source/0, target/0, amqp10_msg_record/0,
Expand All @@ -73,7 +73,6 @@
properties/0]).


unpack(undefined) -> undefined;
unpack({_, Value}) -> Value;
unpack(Value) -> Value.

Expand Down
15 changes: 11 additions & 4 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
%%
-module(amqp10_msg).

-include_lib("amqp10_common/include/amqp10_types.hrl").

-export([from_amqp_records/1,
to_amqp_records/1,
% "read" api
Expand Down Expand Up @@ -256,12 +258,12 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, 0}},
message_format = {uint, ?MESSAGE_FORMAT}},
body = [#'v1_0.data'{content = Body}]};
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, 0}},
message_format = {uint, ?MESSAGE_FORMAT}},
body = Body}.

%% @doc Create a new settled amqp10 message using the specified delivery tag
Expand Down Expand Up @@ -322,8 +324,13 @@ set_properties(Props, #amqp10_msg{properties = undefined} = Msg) ->
set_properties(Props, Msg#amqp10_msg{properties = #'v1_0.properties'{}});
set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
% TODO many fields are `any` types and we need to try to type tag them
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
% message_id can be any type but we restrict it here
P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse
T =:= uuid orelse
T =:= binary orelse
T =:= uf8 ->
Acc#'v1_0.properties'{message_id = TypeVal};
(message_id, V, Acc) when is_binary(V) ->
%% backward compat clause
Acc#'v1_0.properties'{message_id = utf8(V)};
(user_id, V, Acc) when is_binary(V) ->
Acc#'v1_0.properties'{user_id = {binary, V}};
Expand Down
7 changes: 7 additions & 0 deletions deps/amqp10_common/include/amqp10_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@
-type transfer_number() :: sequence_no().
% [2.8.10]
-type sequence_no() :: uint().

% [2.8.1]
-define(AMQP_ROLE_SENDER, false).
-define(AMQP_ROLE_RECEIVER, true).

% [3.2.16]
-define(MESSAGE_FORMAT, 0).
1 change: 1 addition & 0 deletions deps/amqp10_common/src/amqp10_binary_generator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
-export_type([
amqp10_ctor/0,
amqp10_type/0,
amqp10_prim/0,
amqp10_described/0
]).

Expand Down
12 changes: 8 additions & 4 deletions deps/amqp10_common/src/amqp10_framing.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ symbolify(FieldName) when is_atom(FieldName) ->

%% A sequence comes as an arbitrary list of values; it's not a
%% composite type.
decode({described, Descriptor, {list, Fields}}) ->
decode({described, Descriptor, {list, Fields} = Type}) ->
case amqp10_framing0:record_for(Descriptor) of
#'v1_0.amqp_sequence'{} ->
#'v1_0.amqp_sequence'{content = [decode(F) || F <- Fields]};
#'v1_0.amqp_value'{} ->
#'v1_0.amqp_value'{content = Type};
Else ->
fill_from_list(Else, Fields)
end;
decode({described, Descriptor, {map, Fields}}) ->
decode({described, Descriptor, {map, Fields} = Type}) ->
case amqp10_framing0:record_for(Descriptor) of
#'v1_0.application_properties'{} ->
#'v1_0.application_properties'{content = decode_map(Fields)};
Expand All @@ -117,13 +119,15 @@ decode({described, Descriptor, {map, Fields}}) ->
#'v1_0.message_annotations'{content = decode_map(Fields)};
#'v1_0.footer'{} ->
#'v1_0.footer'{content = decode_map(Fields)};
#'v1_0.amqp_value'{} ->
#'v1_0.amqp_value'{content = Type};
Else ->
fill_from_map(Else, Fields)
end;
decode({described, Descriptor, {binary, Field}}) ->
decode({described, Descriptor, {binary, Field} = Type}) ->
case amqp10_framing0:record_for(Descriptor) of
#'v1_0.amqp_value'{} ->
#'v1_0.amqp_value'{content = {binary, Field}};
#'v1_0.amqp_value'{content = Type};
#'v1_0.data'{} ->
#'v1_0.data'{content = Field}
end;
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ rabbitmq_integration_suite(
],
shard_count = 3,
runtime_deps = [
"//deps/amqp10_client:erlang_app",
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

Expand All @@ -1279,7 +1279,7 @@ rabbitmq_integration_suite(
":test_event_recorder_beam",
],
runtime_deps = [
"//deps/amqp10_client:erlang_app",
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ LOCAL_DEPS = sasl os_mon inets compiler public_key crypto ssl syntax_tools xmerl

BUILD_DEPS = rabbitmq_cli
DEPS = ranch rabbit_common amqp10_common rabbitmq_prelaunch ra sysmon_handler stdout_formatter recon redbug observer_cli osiris syslog systemd seshat khepri khepri_mnesia_migration
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck proper amqp_client amqp10_client rabbitmq_amqp1_0
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck proper amqp_client rabbitmq_amqp_client rabbitmq_amqp1_0

PLT_APPS += mnesia

Expand Down
Loading