Skip to content

Support concurrent links with stream filtering #14255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 23, 2025
Merged

Conversation

ansd
Copy link
Member

@ansd ansd commented Jul 21, 2025

What?

If a receiver performs stream filtering with AMQP property filters or AMQP SQL filter expressions, the following downsides can occur:

  1. While the stream is being filtered, other links on the same session are blocked
  2. RabbitMQ sends messages late to the receiver

As an example, let's assume a receiver attaches to the start of a multi GB stream providing a link credit of 2. Let's assume only the very first message matches the filter. In this case, RabbitMQ scans the entire stream without processing other links on the same session, and sends the matched message only once the scan completed (after many seconds or even minutes).

Instead, we want other links to be processed concurrently and the receiver might want to start processing the first matched message while RabbitMQ continues filtering the stream.

This commit fixes these two downsides.

How?

After a threshold of consecutively unmatched messages, the session "pauses" filtering on that link temporarily by:

  1. sending an Erlang message resume_filtering to itself, and
  2. sending any matched messages to the receiver

Any other Erlang messages then have a chance to be processed by the session before the filtering on that link is resumed by the resume_filtering Erlang message.

Once the end of the stream is reached or link credit is exhausted, the credit_reply will be returned from rabbit_stream_queue to rabbit_amqp_session.

An alternative solution would be to use separate Erlang processes for filtering links as they can be CPU bound and also block for disk I/O.

@ansd ansd added this to the 4.2.0 milestone Jul 21, 2025
@ansd ansd self-assigned this Jul 21, 2025
@ansd ansd force-pushed the stream-filtering-yield branch from 8b875bd to f1df370 Compare July 21, 2025 15:51
ansd added 2 commits July 22, 2025 10:05
 ## What?

If a receiver performs stream filtering with AMQP property filters or AMQP SQL filter
expressions, the following downsides can occur:
1. While the stream is being filtered, other links on the same
   session are blocked
2. RabbitMQ sends messages late to the receiver

As an example, let's assume a receiver attaches to the start of a multi GB
stream providing a link credit of 2. Let's assume only the very first message
matches the filter. In this case, RabbitMQ scans the entire
stream without processing other links on the same session, and sends the matched
message only once the scan completed (after many seconds or even minutes).

Instead, we want other links to be processed concurrently and the
receiver might want to start processing the first matched message while
RabbitMQ continues filtering the stream.

This commit fixes these two downsides.

 ## How?

After a threshold of consecutively unmatched messages, the session
"pauses" filtering on that link temporarily by:
1. sending an Erlang message `resume_filtering` to itself, and
2. sending any matched messages to the receiver

Any other Erlang messages then have a chance to be processed by the
session before the filtering on that link is resumed by the
`resume_filtering` Erlang message.

Once the end of the stream is reached or link credit is exhausted, the
`credit_reply` will be returned from `rabbit_stream_queue` to
`rabbit_amqp_session`.

An alternative solution would be to use separate Erlang processes for
filtering links as they can be CPU bound and also block for disk I/O.
This test case tests that two links filtering from the same stream
are processed concurrently by the session if the stream contains
uncompressed sub batches.
@ansd ansd force-pushed the stream-filtering-yield branch from f1df370 to 04009b8 Compare July 22, 2025 10:05
@ansd ansd marked this pull request as ready for review July 22, 2025 10:39
@ansd ansd merged commit d0c3b3a into main Jul 23, 2025
560 of 561 checks passed
@ansd ansd deleted the stream-filtering-yield branch July 23, 2025 07:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant