Skip to content

Handle infinite looping when encountering a misconfigured load balancer #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions RabbitMQ.Stream.Client/RoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public interface IRouting

public class Routing : IRouting
{

public bool ValidateDns { get; set; } = true;

public IClient CreateClient(ClientParameters clientParameters)
Expand All @@ -39,8 +38,11 @@ public IClient CreateClient(ClientParameters clientParameters)
/// </summary>
public static class RoutingHelper<T> where T : IRouting, new()
{
private static async Task<IClient> LookupConnection(ClientParameters clientParameters,
Broker broker)
private static async Task<IClient> LookupConnection(
ClientParameters clientParameters,
Broker broker,
int maxAttempts
)
{
var routing = new T();

Expand Down Expand Up @@ -80,15 +82,22 @@ string GetPropertyValue(IDictionary<string, string> connectionProperties, string
var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
var advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port");

while (broker.Host != advertisedHost ||
broker.Port != uint.Parse(advertisedPort))
var attemptNo = 0;
while (broker.Host != advertisedHost || broker.Port != uint.Parse(advertisedPort))
{
await client.Close("advertised_host or advertised_port doesn't mach");
attemptNo++;
await client.Close("advertised_host or advertised_port doesn't match");

client = routing.CreateClient(clientParameters with { Endpoint = endPoint });

advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
advertisedPort = GetPropertyValue(client.ConnectionProperties, "advertised_port");
if (attemptNo > maxAttempts)
{
throw new RoutingClientException(
$"Could not find broker ({broker.Host}:{broker.Port}) after {maxAttempts} attempts");
}

// TODO: Maybe an external parameter
Thread.Sleep(TimeSpan.FromMilliseconds(200));
}
Expand All @@ -102,7 +111,9 @@ string GetPropertyValue(IDictionary<string, string> connectionProperties, string
public static async Task<IClient> LookupLeaderConnection(ClientParameters clientParameters,
StreamInfo metaDataInfo)
{
return await LookupConnection(clientParameters, metaDataInfo.Leader);
var maxAttempts = (int)Math.Pow(2 + metaDataInfo.Replicas.Count, 2);

return await LookupConnection(clientParameters, metaDataInfo.Leader, maxAttempts);
}

/// <summary>
Expand All @@ -121,7 +132,9 @@ public static async Task<IClient> LookupRandomConnection(ClientParameters client
var rnd = new Random();
var brokerId = rnd.Next(0, brokers.Count);
var broker = brokers[brokerId];
return await LookupConnection(clientParameters, broker);
var maxAttempts = (int)Math.Pow(2 + brokers.Count, 2);

return await LookupConnection(clientParameters, broker, maxAttempts);
}
}

Expand Down
35 changes: 35 additions & 0 deletions Tests/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ public IClient CreateClient(ClientParameters clientParameters)
public bool ValidateDns { get; set; } = false;
}

public class MisconfiguredLoadBalancerRouting : IRouting
{

public IClient CreateClient(ClientParameters clientParameters)
{

var fake = new FakeClient(clientParameters)
{
ConnectionProperties = new Dictionary<string, string>()
{
["advertised_host"] = "node4",
["advertised_port"] = "5552"
}
};
return fake;
}

public bool ValidateDns { get; set; } = false;
}

//advertised_host is is missed
public class MissingFieldsRouting : IRouting
{
Expand Down Expand Up @@ -132,6 +152,21 @@ public void AddressResolverLoadBalancerSimulate()
}
}

[Fact]
public void RoutingHelperShouldThrowIfLoadBalancerIsMisconfigured()
{
var addressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("192.168.10.99"), 5552));
var clientParameters = new ClientParameters()
{
AddressResolver = addressResolver,
};
var metaDataInfo = new StreamInfo("stream", ResponseCode.Ok, new Broker("node2", 5552),
new List<Broker>() { new Broker("replica", 5552) });

Assert.ThrowsAsync<RoutingClientException>(
() => RoutingHelper<MisconfiguredLoadBalancerRouting>.LookupLeaderConnection(clientParameters, metaDataInfo));
}

[Fact]
public void RandomReplicaLeader()
{
Expand Down