Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,10 @@ jobs:
- name: Unit Tests
run: dotnet test tests/GreptimeDB.Ingester.Tests --no-build --configuration Release --verbosity normal

compatibility-net6-net7:
name: Compatibility (net6/net7)
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v6
with:
submodules: recursive

- name: Setup .NET
uses: actions/setup-dotnet@v5
with:
dotnet-version: 10.0.x

- name: Restore
run: dotnet restore

- name: Build net6.0
run: dotnet build src/GreptimeDB.Ingester/GreptimeDB.Ingester.csproj --no-restore --configuration Release -f net6.0

- name: Build net7.0
run: dotnet build src/GreptimeDB.Ingester/GreptimeDB.Ingester.csproj --no-restore --configuration Release -f net7.0

integration-tests:
name: Integration Tests
runs-on: ubuntu-latest
needs: [build, compatibility-net6-net7]
needs: [build]

steps:
- name: Checkout
Expand Down
36 changes: 36 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Changelog

All notable changes to this project are documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Multi-endpoint support via `GreptimeClientOptions.Endpoints` (`IList<string>`).
Supplying more than one endpoint enables client-side load balancing with
automatic failover across endpoints. Single-element lists behave as the
previous single-node case. Backed by `Grpc.Net.Client.Balancer`.
- `GreptimeClientOptions.LoadBalancing` (`LoadBalancingStrategy`) selects the
multi-endpoint balancing policy. Supported: `Random` (default — picks a
ready endpoint uniformly at random per call, avoiding the herding pattern
that round-robin can produce when many short-lived clients start at the
same time) and `RoundRobin`.

### Changed

- **BREAKING:** Dropped `net6.0` and `net7.0` target frameworks. Minimum
supported runtime is now `net8.0`. Both removed TFMs are past Microsoft's
end-of-support, and `Grpc.Net.Client.Balancer` (required for the new
multi-endpoint client-side load balancer) is only shipped in the package's
`net8.0+` build, not its `netstandard2.1` build.
Users on `net6.0` / `net7.0` should pin to the `0.1.x` line, which keeps
those TFMs supported.

### Deprecated

- `GreptimeClientOptions.Endpoint` (single-endpoint string). Use
`GreptimeClientOptions.Endpoints` instead. The property is retained for
backward compatibility and will be removed in a future release.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net6.0;net7.0;net8.0;net9.0;net10.0</TargetFrameworks>
<TargetFrameworks>net8.0;net9.0;net10.0</TargetFrameworks>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<LangVersion>latest</LangVersion>
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageVersion Include="Apache.Arrow.Flight" Version="22.1.0" />

<!-- DI and Configuration -->
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="10.0.5" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.5" />
<PackageVersion Include="Microsoft.Extensions.Options" Version="10.0.5" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.5" />
Expand Down
59 changes: 54 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![NuGet Downloads](https://img.shields.io/nuget/dt/GreptimeDB.Ingester.svg)](https://www.nuget.org/packages/GreptimeDB.Ingester)
[![NuGet Grpc](https://img.shields.io/nuget/v/GreptimeDB.Ingester.Grpc.svg)](https://www.nuget.org/packages/GreptimeDB.Ingester.Grpc)
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE)
![.NET](https://img.shields.io/badge/.NET-6.0%20%7C%207.0%20%7C%208.0%20%7C%209.0%20%7C%2010.0-purple)
![.NET](https://img.shields.io/badge/.NET-8.0%20%7C%209.0%20%7C%2010.0-purple)

.NET SDK for writing data to [GreptimeDB](https://github.com/GreptimeTeam/greptimedb).

Expand All @@ -19,6 +19,13 @@
dotnet add package GreptimeDB.Ingester
```

> **.NET 6.0 / 7.0 users:** the latest version requires .NET 8.0 or newer.
> Stay on the `0.1.x` line for net6.0 / net7.0 support:
>
> ```bash
> dotnet add package GreptimeDB.Ingester --version 0.1.*
> ```

## Quick Start

```csharp
Expand All @@ -29,7 +36,7 @@ using GreptimeDB.Ingester.Types;
// Create client
var client = new GreptimeClient(new GreptimeClientOptions
{
Endpoint = "http://localhost:4001",
Endpoints = new List<string> { "http://localhost:4001" },
Database = "public"
});

Expand All @@ -54,7 +61,7 @@ await client.DisposeAsync();
```csharp
var client = new GreptimeClient(new GreptimeClientOptions
{
Endpoint = "http://localhost:4001",
Endpoints = new List<string> { "http://localhost:4001" },
Database = "public",
ConnectTimeout = TimeSpan.FromSeconds(5),
WriteTimeout = TimeSpan.FromSeconds(30)
Expand All @@ -66,7 +73,7 @@ With basic auth:
```csharp
var client = new GreptimeClient(new GreptimeClientOptions
{
Endpoint = "http://localhost:4001",
Endpoints = new List<string> { "http://localhost:4001" },
Database = "public",
Authentication = new AuthenticationOptions
{
Expand All @@ -76,11 +83,53 @@ var client = new GreptimeClient(new GreptimeClientOptions
});
```

## Multiple Endpoints

Pass more than one endpoint to enable client-side load balancing and failover
across a GreptimeDB cluster:

```csharp
var client = new GreptimeClient(new GreptimeClientOptions
{
Endpoints = new List<string>
{
"http://node-a:4001",
"http://node-b:4001",
"http://node-c:4001",
},
Database = "public",
});
```

A single-element list takes the direct-channel fast path (no balancer); two
or more endpoints route through `Grpc.Net.Client.Balancer` with the configured
strategy. All endpoints must share the same scheme (all `http` or all `https`)
and must be plain `host:port` URIs without a path/query/fragment.

### Load-balancing strategy

`LoadBalancing` selects the policy used in the multi-endpoint case:

```csharp
new GreptimeClientOptions
{
Endpoints = new List<string> { "http://node-a:4001", "http://node-b:4001" },
LoadBalancing = LoadBalancingStrategy.Random, // default
// LoadBalancing = LoadBalancingStrategy.RoundRobin,
};
```

- `Random` (default) — pick a ready endpoint uniformly at random per call.
Avoids the herding pattern that round-robin can produce when many
short-lived clients start simultaneously.
- `RoundRobin` — cycle through ready endpoints in order.

## Features

- **Unary Write** - Simple single-request writes via gRPC
- **Streaming Write** - High-throughput streaming via gRPC for multiple tables
- **Bulk Write** - Maximum throughput via Apache Arrow Flight
- Multi-endpoint client-side load balancing (random / round-robin) with failover
- Type coercion between .NET and GreptimeDB types
- Health check
- DI integration
Expand Down Expand Up @@ -176,7 +225,7 @@ catch (GreptimeDB.Ingester.Exceptions.GreptimeException ex)
```csharp
services.AddGreptimeClient(options =>
{
options.Endpoint = "http://localhost:4001";
options.Endpoints = new List<string> { "http://localhost:4001" };
options.Database = "public";
});
```
Expand Down
2 changes: 1 addition & 1 deletion examples/benchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// --- Health check ---
var client = new GreptimeClient(new GreptimeClientOptions
{
Endpoint = endpoint,
Endpoints = new List<string> { endpoint },
Database = database,
WriteTimeout = TimeSpan.FromSeconds(120)
});
Expand Down
2 changes: 1 addition & 1 deletion examples/quick-test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// --- Write via gRPC ---
var client = new GreptimeClient(new GreptimeClientOptions
{
Endpoint = "http://localhost:4001",
Endpoints = new List<string> { "http://localhost:4001" },
Database = "public"
});

Expand Down
7 changes: 0 additions & 7 deletions src/GreptimeDB.Ingester/Arrow/RecordBatchBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,7 @@ private static Time64Array BuildTime64Array(Table.Table table, int columnIndex,

private void ThrowIfDisposed()
{
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(_disposed, this);
#else
if (_disposed)
{
throw new ObjectDisposedException(nameof(RecordBatchBuilder));
}
#endif
}

public void Dispose()
Expand Down
7 changes: 0 additions & 7 deletions src/GreptimeDB.Ingester/Client/BulkWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,7 @@ private async Task RunRecvLoopAsync(IAsyncStreamReader<FlightPutResult> response

private void ThrowIfDisposed()
{
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(_disposed == 1, this);
#else
if (_disposed == 1)
{
throw new ObjectDisposedException(nameof(BulkWriter));
}
#endif
}

private void ThrowIfCompleted()
Expand Down
81 changes: 68 additions & 13 deletions src/GreptimeDB.Ingester/Client/GreptimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
using GreptimeDB.Ingester.Internal;
using Grpc.Core;
using Grpc.Net.Client;
using Grpc.Net.Client.Balancer;
using Grpc.Net.Client.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

Expand All @@ -18,6 +21,7 @@ public sealed partial class GreptimeClient : IAsyncDisposable, IDisposable
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly GrpcChannel _channel;
private readonly ServiceProvider? _channelServices;
private readonly GreptimeDatabase.GreptimeDatabaseClient _client;
private readonly HealthCheck.HealthCheckClient _healthClient;
private readonly Lazy<FlightClient> _flightClient;
Expand All @@ -35,14 +39,13 @@ public GreptimeClient(GreptimeClientOptions options, ILoggerFactory? loggerFacto
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<GreptimeClient>();

var channelOptions = new GrpcChannelOptions();

_channel = GrpcChannel.ForAddress(options.Endpoint, channelOptions);
var endpoints = options.ResolveEndpoints();
(_channel, _channelServices) = BuildChannel(endpoints, options.LoadBalancing);
_client = new GreptimeDatabase.GreptimeDatabaseClient(_channel);
_healthClient = new HealthCheck.HealthCheckClient(_channel);
_flightClient = new Lazy<FlightClient>(() => new FlightClient(_channel));

LogClientCreated(_logger, options.Endpoint);
LogClientCreated(_logger, endpoints.Count, endpoints[0]);
}

/// <summary>
Expand Down Expand Up @@ -265,6 +268,11 @@ public async Task CloseAsync()
_disposed = true;
await DisposeFlightClientAsync().ConfigureAwait(false);
await _channel.ShutdownAsync().ConfigureAwait(false);
Comment thread
killme2008 marked this conversation as resolved.
_channel.Dispose();
if (_channelServices is not null)
{
await _channelServices.DisposeAsync().ConfigureAwait(false);
}
LogClientClosed(_logger);
}

Expand All @@ -285,6 +293,7 @@ public void Dispose()
_disposed = true;
DisposeFlightClient();
_channel.Dispose();
_channelServices?.Dispose();
LogClientDisposed(_logger);
}

Expand Down Expand Up @@ -349,6 +358,59 @@ private CallOptions CreateCallOptions(CancellationToken cancellationToken)
cancellationToken: cancellationToken);
}

private static (GrpcChannel Channel, ServiceProvider? Services) BuildChannel(
IReadOnlyList<string> endpoints,
LoadBalancingStrategy strategy)
{
// Single endpoint: skip the balancer entirely. This preserves the original
// direct-channel behavior, including default TLS authority/SNI handling for
// https endpoints (the balancer path uses a synthetic channel authority,
// which can break certificate hostname validation).
if (endpoints.Count == 1)
{
return (GrpcChannel.ForAddress(endpoints[0]), null);
}

var addresses = new List<BalancerAddress>(endpoints.Count);
string? scheme = null;
foreach (var endpoint in endpoints)
{
var uri = new Uri(endpoint, UriKind.Absolute);
scheme ??= uri.Scheme;
addresses.Add(new BalancerAddress(uri.DnsSafeHost, uri.Port));
}

var services = new ServiceCollection();
services.AddSingleton<ResolverFactory>(new StaticResolverFactory(addresses));
if (strategy == LoadBalancingStrategy.Random)
{
services.AddSingleton<LoadBalancerFactory>(RandomBalancerFactory.Instance);
}
var serviceProvider = services.BuildServiceProvider();

LoadBalancingConfig lbConfig = strategy switch
{
LoadBalancingStrategy.Random => new RandomConfig(),
LoadBalancingStrategy.RoundRobin => new RoundRobinConfig(),
_ => throw new ArgumentOutOfRangeException(nameof(strategy), strategy, "Unsupported load-balancing strategy."),
};

var channelOptions = new GrpcChannelOptions
{
ServiceProvider = serviceProvider,
ServiceConfig = new ServiceConfig
{
LoadBalancingConfigs = { lbConfig }
},
Credentials = scheme == Uri.UriSchemeHttps
? ChannelCredentials.SecureSsl
: ChannelCredentials.Insecure
};

var channel = GrpcChannel.ForAddress("static:///greptime", channelOptions);
return (channel, serviceProvider);
}

private static void CheckResponse(GreptimeResponse response)
{
var header = response.Header;
Expand All @@ -361,20 +423,13 @@ private static void CheckResponse(GreptimeResponse response)

private void ThrowIfDisposed()
{
#if NET7_0_OR_GREATER
ObjectDisposedException.ThrowIf(_disposed, this);
#else
if (_disposed)
{
throw new ObjectDisposedException(nameof(GreptimeClient));
}
#endif
}

#region Logging

[LoggerMessage(Level = LogLevel.Debug, Message = "GreptimeClient created for endpoint {Endpoint}")]
private static partial void LogClientCreated(ILogger logger, string endpoint);
[LoggerMessage(Level = LogLevel.Debug, Message = "GreptimeClient created with {EndpointCount} endpoint(s); first: {FirstEndpoint}")]
private static partial void LogClientCreated(ILogger logger, int endpointCount, string firstEndpoint);

[LoggerMessage(Level = LogLevel.Debug, Message = "Writing {TableCount} tables with {RowCount} total rows")]
private static partial void LogWriteStarted(ILogger logger, int tableCount, int rowCount);
Expand Down
Loading