Skip to content

Super Stream Consumer #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Oct 24, 2022
Merged

Super Stream Consumer #174

merged 20 commits into from
Oct 24, 2022

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Oct 7, 2022

The base for Standard Consumer and Super-Stream Consumer

Signed-off-by: Gabriele Santomaggio [email protected]

How to test:

  • Step 1: Create a cluster with 3 nodes ( not mandatory but would be better to have a it to test the different connections)
  • Step 2: create the partitions as you prefer, ex: rabbitmq-streams add_super_stream invoices --partitions 8
  • Step 3: Run this test:
public static class MultiEndPoints
{
    public static async Task Start()
    {
        var config = new StreamSystemConfig()
        {
            UserName = "test",
            Password = "test",
            Endpoints = new List<EndPoint>
            {
                new DnsEndPoint("node0", 5552),
                new DnsEndPoint("node1", 5552),
                new DnsEndPoint("node2", 5552),
            },
            // Ssl = new SslOption()
            // {
            //     Enabled = true,
            //     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNotAvailable |
            //                              SslPolicyErrors.RemoteCertificateChainErrors |
            //                              SslPolicyErrors.RemoteCertificateNameMismatch
            //
            // }
        };

        var system = await StreamSystem.Create(config);
        // var streamName = Guid.NewGuid().ToString();
        var streamName = "invoices";
        // await system.CreateStream(new StreamSpec(streamName));

        var count = 0;

        var producer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
        {
            Stream = streamName,
            StreamSystem = system,
            SuperStreamConfig = new SuperStreamConfig()
            {
                Routing = message => Encoding.UTF8.GetString(message.Data.Contents)
            },
            ConfirmationHandler = confirmation =>
            {
                if (Interlocked.Increment(ref count) % 500 == 0)
                {
                    Console.WriteLine($"---- Confirmed {count} messages");
                }

                return Task.CompletedTask;
            },
        });


        var t = Task.Run(async () =>
        {
            for (var i = 0; i < 20_000; i++)
            {
                if (!producer.IsOpen())
                {
                    Console.WriteLine("Producer is not connected");
                    return;
                }

                await producer.Send(new Message(Encoding.UTF8.GetBytes($"{Guid.NewGuid().ToString()} {i}")));
                if (i % 1000 == 0)
                {
                    Console.WriteLine($"--Sent {i} messages");
                }

                Thread.Sleep(1);
            }
        });


        var consumed = 0;
        var reference = "consumer_test";
        var consumers = new List<ReliableConsumer>();
        for (var i = 0; i < 3; i++)
        {
            var consumer = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
            {
                Stream = streamName,
                StreamSystem = system,
                IsSuperStream = true,
                Reference = reference, // Reference is mandatory to use the single active consumer
                IsSingleActiveConsumer = true,
                ConsumerUpdateListener = async (consumerRef, stream, _) =>
                {
                    // the event is raised when a consumer is active due of single active consumer
                    // We start form the beginning of the stream
                    ulong offsetStart = 0;
                    try
                    {
                       // if there is some offset stored we start from there
                        offsetStart = await system.QueryOffset(consumerRef, stream) + 1;
                    }
                    catch (OffsetNotFoundException e)
                    {
                        // No offset found, start from the beginning
                        offsetStart = 0;
                    }
                    
                    return new OffsetTypeOffset(offsetStart);
                },
                MessageHandler = async (_, consumer, context, message) =>
                {
                    if (Interlocked.Increment(ref consumed) % 500 == 0)
                    {
                        await consumer.StoreOffset(context.Offset);
                        Console.WriteLine($"---- Consumed {consumed} messages");
                    }
                },
            });
            consumers.Add(consumer);
        }

        t.Wait();

        // Thread.Sleep(2 * 1000);
        // await consumer.Close();
        Console.WriteLine("Press enter to stop");
        Console.ReadKey();
        foreach (var reliableConsumer in consumers)
        {
            await reliableConsumer.Close();
        }

        Console.ReadKey();
        // await system.DeleteStream(streamName);
        await system.Close();
        Console.WriteLine("Closed");
    }
}
  • step 4: Kill Random connections for the management UI
  • step 5: delete random queue
  • step 6: remove some replica

Step 5/6 should be avoided in production environment

The base for Standard Consumer and Super-Stream Consumer

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio marked this pull request as draft October 7, 2022 09:56
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@codecov
Copy link

codecov bot commented Oct 7, 2022

Codecov Report

Base: 92.73% // Head: 92.83% // Increases project coverage by +0.09% 🎉

Coverage data is based on head (fe8e67d) compared to base (55f78d8).
Patch coverage: 95.11% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #174      +/-   ##
==========================================
+ Coverage   92.73%   92.83%   +0.09%     
==========================================
  Files          90       94       +4     
  Lines        7327     7890     +563     
  Branches      463      590     +127     
==========================================
+ Hits         6795     7325     +530     
- Misses        428      455      +27     
- Partials      104      110       +6     
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/StreamSystem.cs 88.08% <78.57%> (-1.29%) ⬇️
Tests/Utils.cs 76.40% <83.95%> (+3.58%) ⬆️
RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs 90.69% <90.69%> (ø)
RabbitMQ.Stream.Client/SuperStreamConsumer.cs 97.08% <97.08%> (ø)
Tests/SuperStreamConsumerTests.cs 98.84% <98.84%> (ø)
RabbitMQ.Stream.Client/Consumer.cs 90.72% <100.00%> (+2.97%) ⬆️
RabbitMQ.Stream.Client/IConsumer.cs 100.00% <100.00%> (ø)
...abbitMQ.Stream.Client/Reliable/ReliableConsumer.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/SuperStreamProducer.cs 100.00% <100.00%> (+0.59%) ⬆️
Tests/ReliableTests.cs 99.27% <100.00%> (+<0.01%) ⬆️
... and 11 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio marked this pull request as ready for review October 21, 2022 17:42
@Gsantomaggio Gsantomaggio requested a review from Zerpet October 24, 2022 08:51
Copy link
Member

@Zerpet Zerpet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this locally and it works well for me. I tried killing producer and consumer connections; it recovers well and moves on. Feel free to merge (and squash!)

@Gsantomaggio Gsantomaggio merged commit 3516e97 into main Oct 24, 2022
@Gsantomaggio Gsantomaggio deleted the super_stream_consumer branch October 24, 2022 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants