Skip to content

Commit 23aee07

Browse files
committed
Handle creation of stream with different parameters
1 parent 3644ed5 commit 23aee07

File tree

3 files changed

+66
-51
lines changed

3 files changed

+66
-51
lines changed

deps/rabbitmq_stream/src/rabbit_stream.erl

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,19 @@ emit_publisher_info_local(VHost, Items, Ref, AggregatorPid) ->
153153
list(VHost)).
154154

155155
list(VHost) ->
156-
[Client ||
157-
{_, ListSup, _, _} <- supervisor2:which_children(rabbit_stream_sup),
158-
{_, RanchEmbeddedSup, supervisor, _} <- supervisor2:which_children(ListSup),
159-
{{ranch_listener_sup, _}, RanchListSup, _, _} <- supervisor:which_children(RanchEmbeddedSup),
160-
{ranch_conns_sup_sup, RanchConnsSup, supervisor, _} <- supervisor2:which_children(RanchListSup),
161-
{_, RanchConnSup, supervisor, _} <- supervisor2:which_children(RanchConnsSup),
162-
{_, StreamClientSup, supervisor, _} <- supervisor2:which_children(RanchConnSup),
163-
{rabbit_stream_reader, Client, _, _} <- supervisor:which_children(StreamClientSup),
156+
[Client
157+
|| {_, ListSup, _, _}
158+
<- supervisor2:which_children(rabbit_stream_sup),
159+
{_, RanchEmbeddedSup, supervisor, _}
160+
<- supervisor2:which_children(ListSup),
161+
{{ranch_listener_sup, _}, RanchListSup, _, _}
162+
<- supervisor:which_children(RanchEmbeddedSup),
163+
{ranch_conns_sup_sup, RanchConnsSup, supervisor, _}
164+
<- supervisor2:which_children(RanchListSup),
165+
{_, RanchConnSup, supervisor, _}
166+
<- supervisor2:which_children(RanchConnsSup),
167+
{_, StreamClientSup, supervisor, _}
168+
<- supervisor2:which_children(RanchConnSup),
169+
{rabbit_stream_reader, Client, _, _}
170+
<- supervisor:which_children(StreamClientSup),
164171
rabbit_stream_reader:in_vhost(Client, VHost)].

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -163,42 +163,52 @@ handle_call({create, VirtualHost, Reference, Arguments, Username},
163163
VirtualHost,
164164
#{user => Username},
165165
rabbit_stream_queue),
166-
case rabbit_amqqueue:with(Name,
167-
fun(Q) ->
168-
ok =
169-
rabbit_amqqueue:assert_equivalence(Q,
170-
true,
171-
false,
172-
StreamQueueArguments,
173-
none)
174-
end)
175-
of
176-
ok ->
177-
{reply, {error, reference_already_exists}, State};
178-
{error, not_found} ->
179-
try
180-
case rabbit_stream_queue:declare(Q0, node()) of
181-
{new, Q} ->
182-
{reply, {ok, amqqueue:get_type_state(Q)},
183-
State};
184-
{existing, _} ->
185-
{reply, {error, reference_already_exists},
186-
State};
187-
{error, Err} ->
188-
rabbit_log:warning("Error while creating ~p stream, ~p",
189-
[Reference, Err]),
166+
try
167+
QueueLookup =
168+
rabbit_amqqueue:with(Name,
169+
fun(Q) ->
170+
ok =
171+
rabbit_amqqueue:assert_equivalence(Q,
172+
true,
173+
false,
174+
StreamQueueArguments,
175+
none)
176+
end),
177+
178+
case QueueLookup of
179+
ok ->
180+
{reply, {error, reference_already_exists}, State};
181+
{error, not_found} ->
182+
try
183+
case rabbit_stream_queue:declare(Q0, node()) of
184+
{new, Q} ->
185+
{reply, {ok, amqqueue:get_type_state(Q)},
186+
State};
187+
{existing, _} ->
188+
{reply, {error, reference_already_exists},
189+
State};
190+
{error, Err} ->
191+
rabbit_log:warning("Error while creating ~p stream, ~p",
192+
[Reference, Err]),
193+
{reply, {error, internal_error}, State}
194+
end
195+
catch
196+
exit:Error ->
197+
rabbit_log:error("Error while creating ~p stream, ~p",
198+
[Reference, Error]),
190199
{reply, {error, internal_error}, State}
191-
end
192-
catch
193-
exit:Error ->
194-
rabbit_log:error("Error while creating ~p stream, ~p",
195-
[Reference, Error]),
196-
{reply, {error, internal_error}, State}
197-
end;
198-
{error, {absent, _, Reason}} ->
199-
rabbit_log:error("Error while creating ~p stream, ~p",
200-
[Reference, Reason]),
201-
{reply, {error, internal_error}, State}
200+
end;
201+
{error, {absent, _, Reason}} ->
202+
rabbit_log:error("Error while creating ~p stream, ~p",
203+
[Reference, Reason]),
204+
{reply, {error, internal_error}, State}
205+
end
206+
catch
207+
exit:ExitError ->
208+
% likely to be a problem of inequivalent args on an existing stream
209+
rabbit_log:error("Error while creating ~p stream: ~p",
210+
[Reference, ExitError]),
211+
{reply, {error, validation_failed}, State}
202212
end;
203213
error ->
204214
{reply, {error, validation_failed}, State}
@@ -218,7 +228,8 @@ handle_call({delete, VirtualHost, Reference, Username}, _From,
218228
true ->
219229
rabbit_log:debug("Queue record ~p is a stream, trying to delete it",
220230
[Reference]),
221-
{ok, _} = rabbit_stream_queue:delete(Q, false, false, Username),
231+
{ok, _} =
232+
rabbit_stream_queue:delete(Q, false, false, Username),
222233
rabbit_log:debug("Stream ~p deleted", [Reference]),
223234
{reply, {ok, deleted}, State};
224235
_ ->

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ listen_loop_pre_auth(Transport,
277277
#configuration{frame_max = FrameMax,
278278
heartbeat = Heartbeat} =
279279
Configuration) ->
280-
{OK, Closed, Error, _Passive} = Transport:messages(),
280+
{OK, Closed, Error, _Passive} = Transport:messages(),
281281
%% FIXME introduce timeout to complete the connection opening (after block should be enough)
282282
receive
283283
{OK, S, Data} ->
@@ -584,8 +584,7 @@ listen_loop_post_auth(Transport,
584584
{'$gen_cast',
585585
{queue_event, #resource{name = StreamName},
586586
{osiris_offset, _QueueResource, -1}}} ->
587-
rabbit_log:info("received osiris offset event for ~p with offset "
588-
"~p",
587+
rabbit_log:info("received osiris offset event for ~p with offset ~p",
589588
[StreamName, -1]),
590589
listen_loop_post_auth(Transport, Connection, State, Configuration);
591590
{'$gen_cast',
@@ -1542,8 +1541,7 @@ handle_frame_post_auth(Transport,
15421541
?COMMAND_SUBSCRIBE,
15431542
CorrelationId),
15441543

1545-
rabbit_log:info("Distributing existing messages to subscription "
1546-
"~p",
1544+
rabbit_log:info("Distributing existing messages to subscription ~p",
15471545
[SubscriptionId]),
15481546
{{segment, Segment1}, {credit, Credit1}} =
15491547
send_chunks(Transport, ConsumerState,
@@ -1673,8 +1671,7 @@ handle_frame_post_auth(_Transport,
16731671
end;
16741672
error ->
16751673
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
1676-
rabbit_log:info("Not authorized to commit offset on ~p",
1677-
[Stream]),
1674+
rabbit_log:info("Not authorized to commit offset on ~p", [Stream]),
16781675
{Connection, State, Rest}
16791676
end;
16801677
handle_frame_post_auth(Transport,

0 commit comments

Comments
 (0)