Skip to content

Commit 778bb40

Browse files
authored
Merge pull request #46 from GreptimeTeam/feat/multi-endpoints
feat!: supports multi endpoints
2 parents ef9c243 + 6a6ba83 commit 778bb40

19 files changed

Lines changed: 573 additions & 73 deletions

.github/workflows/ci.yml

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -70,34 +70,10 @@ jobs:
7070
- name: Unit Tests
7171
run: dotnet test tests/GreptimeDB.Ingester.Tests --no-build --configuration Release --verbosity normal
7272

73-
compatibility-net6-net7:
74-
name: Compatibility (net6/net7)
75-
runs-on: ubuntu-latest
76-
77-
steps:
78-
- name: Checkout
79-
uses: actions/checkout@v6
80-
with:
81-
submodules: recursive
82-
83-
- name: Setup .NET
84-
uses: actions/setup-dotnet@v5
85-
with:
86-
dotnet-version: 10.0.x
87-
88-
- name: Restore
89-
run: dotnet restore
90-
91-
- name: Build net6.0
92-
run: dotnet build src/GreptimeDB.Ingester/GreptimeDB.Ingester.csproj --no-restore --configuration Release -f net6.0
93-
94-
- name: Build net7.0
95-
run: dotnet build src/GreptimeDB.Ingester/GreptimeDB.Ingester.csproj --no-restore --configuration Release -f net7.0
96-
9773
integration-tests:
9874
name: Integration Tests
9975
runs-on: ubuntu-latest
100-
needs: [build, compatibility-net6-net7]
76+
needs: [build]
10177

10278
steps:
10379
- name: Checkout

CHANGELOG.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Changelog
2+
3+
All notable changes to this project are documented in this file.
4+
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## [Unreleased]
9+
10+
### Added
11+
12+
- Multi-endpoint support via `GreptimeClientOptions.Endpoints` (`IList<string>`).
13+
Supplying more than one endpoint enables client-side load balancing with
14+
automatic failover across endpoints. Single-element lists behave as the
15+
previous single-node case. Backed by `Grpc.Net.Client.Balancer`.
16+
- `GreptimeClientOptions.LoadBalancing` (`LoadBalancingStrategy`) selects the
17+
multi-endpoint balancing policy. Supported: `Random` (default — picks a
18+
ready endpoint uniformly at random per call, avoiding the herding pattern
19+
that round-robin can produce when many short-lived clients start at the
20+
same time) and `RoundRobin`.
21+
22+
### Changed
23+
24+
- **BREAKING:** Dropped `net6.0` and `net7.0` target frameworks. Minimum
25+
supported runtime is now `net8.0`. Both removed TFMs are past Microsoft's
26+
end-of-support, and `Grpc.Net.Client.Balancer` (required for the new
27+
multi-endpoint client-side load balancer) is only shipped in the package's
28+
`net8.0+` build, not its `netstandard2.1` build.
29+
Users on `net6.0` / `net7.0` should pin to the `0.1.x` line, which keeps
30+
those TFMs supported.
31+
32+
### Deprecated
33+
34+
- `GreptimeClientOptions.Endpoint` (single-endpoint string). Use
35+
`GreptimeClientOptions.Endpoints` instead. The property is retained for
36+
backward compatibility and will be removed in a future release.

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<PropertyGroup>
3-
<TargetFrameworks>net6.0;net7.0;net8.0;net9.0;net10.0</TargetFrameworks>
3+
<TargetFrameworks>net8.0;net9.0;net10.0</TargetFrameworks>
44
<Nullable>enable</Nullable>
55
<ImplicitUsings>enable</ImplicitUsings>
66
<LangVersion>latest</LangVersion>

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<PackageVersion Include="Apache.Arrow.Flight" Version="22.1.0" />
1515

1616
<!-- DI and Configuration -->
17+
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="10.0.5" />
1718
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.5" />
1819
<PackageVersion Include="Microsoft.Extensions.Options" Version="10.0.5" />
1920
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.5" />

README.md

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![NuGet Downloads](https://img.shields.io/nuget/dt/GreptimeDB.Ingester.svg)](https://www.nuget.org/packages/GreptimeDB.Ingester)
66
[![NuGet Grpc](https://img.shields.io/nuget/v/GreptimeDB.Ingester.Grpc.svg)](https://www.nuget.org/packages/GreptimeDB.Ingester.Grpc)
77
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE)
8-
![.NET](https://img.shields.io/badge/.NET-6.0%20%7C%207.0%20%7C%208.0%20%7C%209.0%20%7C%2010.0-purple)
8+
![.NET](https://img.shields.io/badge/.NET-8.0%20%7C%209.0%20%7C%2010.0-purple)
99

1010
.NET SDK for writing data to [GreptimeDB](https://github.com/GreptimeTeam/greptimedb).
1111

@@ -19,6 +19,13 @@
1919
dotnet add package GreptimeDB.Ingester
2020
```
2121

22+
> **.NET 6.0 / 7.0 users:** the latest version requires .NET 8.0 or newer.
23+
> Stay on the `0.1.x` line for net6.0 / net7.0 support:
24+
>
25+
> ```bash
26+
> dotnet add package GreptimeDB.Ingester --version 0.1.*
27+
> ```
28+
2229
## Quick Start
2330
2431
```csharp
@@ -29,7 +36,7 @@ using GreptimeDB.Ingester.Types;
2936
// Create client
3037
var client = new GreptimeClient(new GreptimeClientOptions
3138
{
32-
Endpoint = "http://localhost:4001",
39+
Endpoints = new List<string> { "http://localhost:4001" },
3340
Database = "public"
3441
});
3542
@@ -54,7 +61,7 @@ await client.DisposeAsync();
5461
```csharp
5562
var client = new GreptimeClient(new GreptimeClientOptions
5663
{
57-
Endpoint = "http://localhost:4001",
64+
Endpoints = new List<string> { "http://localhost:4001" },
5865
Database = "public",
5966
ConnectTimeout = TimeSpan.FromSeconds(5),
6067
WriteTimeout = TimeSpan.FromSeconds(30)
@@ -66,7 +73,7 @@ With basic auth:
6673
```csharp
6774
var client = new GreptimeClient(new GreptimeClientOptions
6875
{
69-
Endpoint = "http://localhost:4001",
76+
Endpoints = new List<string> { "http://localhost:4001" },
7077
Database = "public",
7178
Authentication = new AuthenticationOptions
7279
{
@@ -76,11 +83,53 @@ var client = new GreptimeClient(new GreptimeClientOptions
7683
});
7784
```
7885
86+
## Multiple Endpoints
87+
88+
Pass more than one endpoint to enable client-side load balancing and failover
89+
across a GreptimeDB cluster:
90+
91+
```csharp
92+
var client = new GreptimeClient(new GreptimeClientOptions
93+
{
94+
Endpoints = new List<string>
95+
{
96+
"http://node-a:4001",
97+
"http://node-b:4001",
98+
"http://node-c:4001",
99+
},
100+
Database = "public",
101+
});
102+
```
103+
104+
A single-element list takes the direct-channel fast path (no balancer); two
105+
or more endpoints route through `Grpc.Net.Client.Balancer` with the configured
106+
strategy. All endpoints must share the same scheme (all `http` or all `https`)
107+
and must be plain `host:port` URIs without a path/query/fragment.
108+
109+
### Load-balancing strategy
110+
111+
`LoadBalancing` selects the policy used in the multi-endpoint case:
112+
113+
```csharp
114+
new GreptimeClientOptions
115+
{
116+
Endpoints = new List<string> { "http://node-a:4001", "http://node-b:4001" },
117+
LoadBalancing = LoadBalancingStrategy.Random, // default
118+
// LoadBalancing = LoadBalancingStrategy.RoundRobin,
119+
};
120+
```
121+
122+
- `Random` (default) — pick a ready endpoint uniformly at random per call.
123+
Avoids the herding pattern that round-robin can produce when many
124+
short-lived clients start simultaneously.
125+
- `RoundRobin` — cycle through ready endpoints in order.
126+
79127
## Features
80128
81129
- **Unary Write** - Simple single-request writes via gRPC
82130
- **Streaming Write** - High-throughput streaming via gRPC for multiple tables
83131
- **Bulk Write** - Maximum throughput via Apache Arrow Flight
132+
- Multi-endpoint client-side load balancing (random / round-robin) with failover
84133
- Type coercion between .NET and GreptimeDB types
85134
- Health check
86135
- DI integration
@@ -176,7 +225,7 @@ catch (GreptimeDB.Ingester.Exceptions.GreptimeException ex)
176225
```csharp
177226
services.AddGreptimeClient(options =>
178227
{
179-
options.Endpoint = "http://localhost:4001";
228+
options.Endpoints = new List<string> { "http://localhost:4001" };
180229
options.Database = "public";
181230
});
182231
```

examples/benchmark.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
// --- Health check ---
2626
var client = new GreptimeClient(new GreptimeClientOptions
2727
{
28-
Endpoint = endpoint,
28+
Endpoints = new List<string> { endpoint },
2929
Database = database,
3030
WriteTimeout = TimeSpan.FromSeconds(120)
3131
});

examples/quick-test.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
// --- Write via gRPC ---
1010
var client = new GreptimeClient(new GreptimeClientOptions
1111
{
12-
Endpoint = "http://localhost:4001",
12+
Endpoints = new List<string> { "http://localhost:4001" },
1313
Database = "public"
1414
});
1515

src/GreptimeDB.Ingester/Arrow/RecordBatchBuilder.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,7 @@ private static Time64Array BuildTime64Array(Table.Table table, int columnIndex,
357357

358358
private void ThrowIfDisposed()
359359
{
360-
#if NET7_0_OR_GREATER
361360
ObjectDisposedException.ThrowIf(_disposed, this);
362-
#else
363-
if (_disposed)
364-
{
365-
throw new ObjectDisposedException(nameof(RecordBatchBuilder));
366-
}
367-
#endif
368361
}
369362

370363
public void Dispose()

src/GreptimeDB.Ingester/Client/BulkWriter.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,7 @@ private async Task RunRecvLoopAsync(IAsyncStreamReader<FlightPutResult> response
264264

265265
private void ThrowIfDisposed()
266266
{
267-
#if NET7_0_OR_GREATER
268267
ObjectDisposedException.ThrowIf(_disposed == 1, this);
269-
#else
270-
if (_disposed == 1)
271-
{
272-
throw new ObjectDisposedException(nameof(BulkWriter));
273-
}
274-
#endif
275268
}
276269

277270
private void ThrowIfCompleted()

src/GreptimeDB.Ingester/Client/GreptimeClient.cs

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
using GreptimeDB.Ingester.Internal;
55
using Grpc.Core;
66
using Grpc.Net.Client;
7+
using Grpc.Net.Client.Balancer;
8+
using Grpc.Net.Client.Configuration;
9+
using Microsoft.Extensions.DependencyInjection;
710
using Microsoft.Extensions.Logging;
811
using Microsoft.Extensions.Logging.Abstractions;
912

@@ -18,6 +21,7 @@ public sealed partial class GreptimeClient : IAsyncDisposable, IDisposable
1821
private readonly ILoggerFactory _loggerFactory;
1922
private readonly ILogger _logger;
2023
private readonly GrpcChannel _channel;
24+
private readonly ServiceProvider? _channelServices;
2125
private readonly GreptimeDatabase.GreptimeDatabaseClient _client;
2226
private readonly HealthCheck.HealthCheckClient _healthClient;
2327
private readonly Lazy<FlightClient> _flightClient;
@@ -35,14 +39,13 @@ public GreptimeClient(GreptimeClientOptions options, ILoggerFactory? loggerFacto
3539
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
3640
_logger = _loggerFactory.CreateLogger<GreptimeClient>();
3741

38-
var channelOptions = new GrpcChannelOptions();
39-
40-
_channel = GrpcChannel.ForAddress(options.Endpoint, channelOptions);
42+
var endpoints = options.ResolveEndpoints();
43+
(_channel, _channelServices) = BuildChannel(endpoints, options.LoadBalancing);
4144
_client = new GreptimeDatabase.GreptimeDatabaseClient(_channel);
4245
_healthClient = new HealthCheck.HealthCheckClient(_channel);
4346
_flightClient = new Lazy<FlightClient>(() => new FlightClient(_channel));
4447

45-
LogClientCreated(_logger, options.Endpoint);
48+
LogClientCreated(_logger, endpoints.Count, endpoints[0]);
4649
}
4750

4851
/// <summary>
@@ -265,6 +268,11 @@ public async Task CloseAsync()
265268
_disposed = true;
266269
await DisposeFlightClientAsync().ConfigureAwait(false);
267270
await _channel.ShutdownAsync().ConfigureAwait(false);
271+
_channel.Dispose();
272+
if (_channelServices is not null)
273+
{
274+
await _channelServices.DisposeAsync().ConfigureAwait(false);
275+
}
268276
LogClientClosed(_logger);
269277
}
270278

@@ -285,6 +293,7 @@ public void Dispose()
285293
_disposed = true;
286294
DisposeFlightClient();
287295
_channel.Dispose();
296+
_channelServices?.Dispose();
288297
LogClientDisposed(_logger);
289298
}
290299

@@ -349,6 +358,59 @@ private CallOptions CreateCallOptions(CancellationToken cancellationToken)
349358
cancellationToken: cancellationToken);
350359
}
351360

361+
private static (GrpcChannel Channel, ServiceProvider? Services) BuildChannel(
362+
IReadOnlyList<string> endpoints,
363+
LoadBalancingStrategy strategy)
364+
{
365+
// Single endpoint: skip the balancer entirely. This preserves the original
366+
// direct-channel behavior, including default TLS authority/SNI handling for
367+
// https endpoints (the balancer path uses a synthetic channel authority,
368+
// which can break certificate hostname validation).
369+
if (endpoints.Count == 1)
370+
{
371+
return (GrpcChannel.ForAddress(endpoints[0]), null);
372+
}
373+
374+
var addresses = new List<BalancerAddress>(endpoints.Count);
375+
string? scheme = null;
376+
foreach (var endpoint in endpoints)
377+
{
378+
var uri = new Uri(endpoint, UriKind.Absolute);
379+
scheme ??= uri.Scheme;
380+
addresses.Add(new BalancerAddress(uri.DnsSafeHost, uri.Port));
381+
}
382+
383+
var services = new ServiceCollection();
384+
services.AddSingleton<ResolverFactory>(new StaticResolverFactory(addresses));
385+
if (strategy == LoadBalancingStrategy.Random)
386+
{
387+
services.AddSingleton<LoadBalancerFactory>(RandomBalancerFactory.Instance);
388+
}
389+
var serviceProvider = services.BuildServiceProvider();
390+
391+
LoadBalancingConfig lbConfig = strategy switch
392+
{
393+
LoadBalancingStrategy.Random => new RandomConfig(),
394+
LoadBalancingStrategy.RoundRobin => new RoundRobinConfig(),
395+
_ => throw new ArgumentOutOfRangeException(nameof(strategy), strategy, "Unsupported load-balancing strategy."),
396+
};
397+
398+
var channelOptions = new GrpcChannelOptions
399+
{
400+
ServiceProvider = serviceProvider,
401+
ServiceConfig = new ServiceConfig
402+
{
403+
LoadBalancingConfigs = { lbConfig }
404+
},
405+
Credentials = scheme == Uri.UriSchemeHttps
406+
? ChannelCredentials.SecureSsl
407+
: ChannelCredentials.Insecure
408+
};
409+
410+
var channel = GrpcChannel.ForAddress("static:///greptime", channelOptions);
411+
return (channel, serviceProvider);
412+
}
413+
352414
private static void CheckResponse(GreptimeResponse response)
353415
{
354416
var header = response.Header;
@@ -361,20 +423,13 @@ private static void CheckResponse(GreptimeResponse response)
361423

362424
private void ThrowIfDisposed()
363425
{
364-
#if NET7_0_OR_GREATER
365426
ObjectDisposedException.ThrowIf(_disposed, this);
366-
#else
367-
if (_disposed)
368-
{
369-
throw new ObjectDisposedException(nameof(GreptimeClient));
370-
}
371-
#endif
372427
}
373428

374429
#region Logging
375430

376-
[LoggerMessage(Level = LogLevel.Debug, Message = "GreptimeClient created for endpoint {Endpoint}")]
377-
private static partial void LogClientCreated(ILogger logger, string endpoint);
431+
[LoggerMessage(Level = LogLevel.Debug, Message = "GreptimeClient created with {EndpointCount} endpoint(s); first: {FirstEndpoint}")]
432+
private static partial void LogClientCreated(ILogger logger, int endpointCount, string firstEndpoint);
378433

379434
[LoggerMessage(Level = LogLevel.Debug, Message = "Writing {TableCount} tables with {RowCount} total rows")]
380435
private static partial void LogWriteStarted(ILogger logger, int tableCount, int rowCount);

0 commit comments

Comments
 (0)