Skip to content

Commit 04009b8

Browse files
committed
Add test case amqp_sql_filter
This test case tests that two links filtering from the same stream are processed concurrently by the session if the stream contains uncompressed sub batches.
1 parent 74d34b0 commit 04009b8

File tree

2 files changed

+93
-11
lines changed

2 files changed

+93
-11
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,9 @@ simple_entry(Sequence, Body, AppProps)
150150

151151
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
152152
%% All data sections are delivered uncompressed in 1 batch.
153-
sub_batch_entry_uncompressed(Sequence, Bodies) ->
153+
sub_batch_entry_uncompressed(Sequence, AppProps, Bodies) ->
154+
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
154155
Batch = lists:foldl(fun(Body, Acc) ->
155-
AppProps = #'v1_0.application_properties'{
156-
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
157-
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
158156
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
159157
Sect = <<Sect0/binary, Sect1/binary>>,
160158
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-include_lib("common_test/include/ct.hrl").
1414
-include_lib("eunit/include/eunit.hrl").
1515
-include_lib("amqp_client/include/amqp_client.hrl").
16+
-include_lib("amqp10_client/include/amqp10_client.hrl").
1617
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1718
-include_lib("amqp10_common/include/amqp10_filter.hrl").
1819

@@ -26,7 +27,8 @@ groups() ->
2627
amqp_credit_multiple_grants,
2728
amqp_credit_single_grant,
2829
amqp_attach_sub_batch,
29-
amqp_filter_expression
30+
amqp_property_filter,
31+
amqp_sql_filter
3032
]
3133
}].
3234

@@ -272,9 +274,9 @@ amqp_attach_sub_batch(Config) ->
272274
ok = amqp10_client:detach_link(Receiver),
273275
ok = amqp10_client:close_connection(Connection).
274276

275-
%% Test that AMQP filter expressions work when messages
277+
%% Test that AMQP property filter works when messages
276278
%% are published via the stream protocol and consumed via AMQP.
277-
amqp_filter_expression(Config) ->
279+
amqp_property_filter(Config) ->
278280
Stream = atom_to_binary(?FUNCTION_NAME),
279281
publish_via_stream_protocol(Stream, Config),
280282

@@ -317,6 +319,87 @@ amqp_filter_expression(Config) ->
317319
ok = amqp10_client:detach_link(Receiver),
318320
ok = amqp10_client:close_connection(Connection).
319321

322+
amqp_sql_filter(Config) ->
323+
Stream = atom_to_binary(?FUNCTION_NAME),
324+
Address = <<"/queue/", Stream/binary>>,
325+
326+
AppProps1 = #'v1_0.application_properties'{content = [{{utf8, <<"key">>}, {byte, 1}}]},
327+
AppProps2 = #'v1_0.application_properties'{content = [{{utf8, <<"key">>}, {byte, 2}}]},
328+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
329+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
330+
PublisherId = 55,
331+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
332+
Bodies = lists:duplicate(2000, <<"middle">>),
333+
UncompressedSubbatch1 = stream_test_utils:sub_batch_entry_uncompressed(1, AppProps1, [<<"first">>]),
334+
UncompressedSubbatch2 = stream_test_utils:sub_batch_entry_uncompressed(2, AppProps2, Bodies),
335+
UncompressedSubbatch3 = stream_test_utils:sub_batch_entry_uncompressed(3, AppProps2, Bodies),
336+
UncompressedSubbatch4 = stream_test_utils:sub_batch_entry_uncompressed(4, AppProps1, [<<"last">>]),
337+
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, 1, UncompressedSubbatch1),
338+
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch2),
339+
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, UncompressedSubbatch3),
340+
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, 1, UncompressedSubbatch4),
341+
{ok, _} = stream_test_utils:close(S, C6),
342+
343+
OpnConf = connection_config(Config),
344+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
345+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
346+
347+
SQL = <<"a.key % 2 = 1">>,
348+
Filter = #{<<"from start">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>,
349+
value = {symbol, <<"first">>}},
350+
?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_NAME_SQL_FILTER,
351+
value = {utf8, SQL}}},
352+
{ok, Receiver1} = amqp10_client:attach_receiver_link(
353+
Session, <<"receiver 1">>, Address,
354+
settled, configuration, Filter),
355+
{ok, Receiver2} = amqp10_client:attach_receiver_link(
356+
Session, <<"receiver 2">>, Address,
357+
settled, configuration, Filter),
358+
receive {amqp10_event, {link, Receiver1, attached}} -> ok
359+
after 9000 -> ct:fail({missing_msg, ?LINE})
360+
end,
361+
receive {amqp10_event, {link, Receiver2, attached}} -> ok
362+
after 9000 -> ct:fail({missing_msg, ?LINE})
363+
end,
364+
365+
ok = amqp10_client:flow_link_credit(Receiver1, 3, never, true),
366+
ok = amqp10_client:flow_link_credit(Receiver2, 3, never, true),
367+
368+
%% For two links filtering on the same session, we expect that RabbitMQ
369+
%% delivers messages concurrently (instead of scanning the entire stream
370+
%% for the 1st receiver before scanning the entire stream for the 2nd receiver).
371+
receive {amqp10_msg, _, First1} ->
372+
?assertEqual([<<"first">>], amqp10_msg:body(First1))
373+
after 9000 -> ct:fail({missing_msg, ?LINE})
374+
end,
375+
receive {amqp10_msg, _, First2} ->
376+
?assertEqual([<<"first">>], amqp10_msg:body(First2))
377+
after 9000 -> ct:fail({missing_msg, ?LINE})
378+
end,
379+
380+
receive {amqp10_msg, _, Last1} ->
381+
?assertEqual([<<"last">>], amqp10_msg:body(Last1))
382+
after 60_000 -> ct:fail({missing_msg, ?LINE})
383+
end,
384+
receive {amqp10_msg, _, Last2} ->
385+
?assertEqual([<<"last">>], amqp10_msg:body(Last2))
386+
after 60_000 -> ct:fail({missing_msg, ?LINE})
387+
end,
388+
389+
receive {amqp10_event, {link, Receiver1, credit_exhausted}} -> ok
390+
after 9000 -> ct:fail({missing_event, ?LINE})
391+
end,
392+
receive {amqp10_event, {link, Receiver2, credit_exhausted}} -> ok
393+
after 9000 -> ct:fail({missing_event, ?LINE})
394+
end,
395+
396+
ok = amqp10_client:detach_link(Receiver1),
397+
ok = amqp10_client:detach_link(Receiver2),
398+
ok = amqp10_client:close_connection(Connection),
399+
receive {amqp10_event, {connection, Connection, {closed, normal}}} -> ok
400+
after 9000 -> ct:fail({missing_event, ?LINE})
401+
end.
402+
320403
%% -------------------------------------------------------------------
321404
%% Helpers
322405
%% -------------------------------------------------------------------
@@ -330,15 +413,16 @@ publish_via_stream_protocol(Stream, Config) ->
330413
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
331414

332415
M1 = stream_test_utils:simple_entry(1, <<"m1">>),
333-
M2 = stream_test_utils:simple_entry(2, <<"m2">>, #'v1_0.application_properties'{
334-
content = [{{utf8, <<"my key">>},
335-
{utf8, <<"my value">>}}]}),
416+
AppProps = #'v1_0.application_properties'{content = [{{utf8, <<"my key">>},
417+
{utf8, <<"my value">>}}]},
418+
M2 = stream_test_utils:simple_entry(2, <<"m2">>, AppProps),
336419
M3 = stream_test_utils:simple_entry(3, <<"m3">>),
337420
Messages1 = [M1, M2, M3],
338421

339422
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
340423

341-
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
424+
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(
425+
4, AppProps, [<<"m4">>, <<"m5">>, <<"m6">>]),
342426
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch),
343427

344428
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),

0 commit comments

Comments
 (0)