Skip to content

Commit 6777d82

Browse files
committed
Host.Kafka] Support for High-Throughput Publishing to Kafka
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent 50d91cb commit 6777d82

23 files changed

+288
-220
lines changed

docs/provider_kafka.md

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat
66
- [Configuration properties](#configuration-properties)
77
- [Minimizing message latency](#minimizing-message-latency)
88
- [SSL and password authentication](#ssl-and-password-authentication)
9-
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
10-
- [Default partitioner with message key](#default-partitioner-with-message-key)
11-
- [Assigning partition explicitly](#assigning-partition-explicitly)
12-
- [Consumer context](#consumer-context)
9+
- [Producers](#producers)
10+
- [High throughput publish](#high-throughput-publish)
11+
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
12+
- [Default partitioner with message key](#default-partitioner-with-message-key)
13+
- [Assigning partition explicitly](#assigning-partition-explicitly)
1314
- [Message Headers](#message-headers)
1415
- [Consumers](#consumers)
16+
- [Consumer context](#consumer-context)
1517
- [Offset Commit](#offset-commit)
1618
- [Consumer Error Handling](#consumer-error-handling)
1719
- [Debugging](#debugging)
@@ -29,7 +31,7 @@ When troubleshooting or fine tuning it is worth reading the `librdkafka` and `co
2931

3032
## Configuration properties
3133

32-
Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
34+
Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
3335
The configuration on the underlying Kafka client can be adjusted like so:
3436

3537
```cs
@@ -53,7 +55,7 @@ services.AddSlimMessageBus(mbb =>
5355

5456
### Minimizing message latency
5557

56-
There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:
58+
There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:
5759

5860
```cs
5961
services.AddSlimMessageBus(mbb =>
@@ -119,12 +121,32 @@ private static void AddSsl(string username, string password, ClientConfig c)
119121

120122
The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`.
121123

122-
## Selecting message partition for topic producer
124+
## Producers
125+
126+
### High throughput publish
127+
128+
By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response.
129+
This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster.
130+
131+
However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration.
132+
When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively.
133+
134+
```cs
135+
mbb.Produce<PingMessage>(x =>
136+
{
137+
x.DefaultTopic(topic);
138+
// Partition #0 - for even counters, and #1 - for odd counters
139+
x.PartitionProvider((m, t) => m.Counter % 2);
140+
x.EnableProduceAwait(enableProduceAwait);
141+
});
142+
```
143+
144+
### Selecting message partition for topic producer
123145

124146
Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message?
125147
There are two possible options:
126148

127-
### Default partitioner with message key
149+
#### Default partitioner with message key
128150

129151
Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)).
130152
The default partitioner is supported, which works in this way:
@@ -148,7 +170,7 @@ mbb
148170

149171
The key must be a `byte[]`.
150172

151-
### Assigning partition explicitly
173+
#### Assigning partition explicitly
152174

153175
SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:
154176

@@ -167,33 +189,13 @@ mbb
167189

168190
With this approach your provider needs to know the number of partitions for a topic.
169191

170-
## Consumer context
171-
172-
The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:
173-
174-
```cs
175-
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
176-
{
177-
public IConsumerContext Context { get; set; }
178-
179-
public Task OnHandle(PingMessage message)
180-
{
181-
// SMB Kafka transport specific extension:
182-
var transportMessage = Context.GetTransportMessage();
183-
var partition = transportMessage.TopicPartition.Partition;
184-
}
185-
}
186-
```
187-
188-
This could be useful to extract the message's offset or partition.
189-
190192
## Message Headers
191193

192194
SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`).
193195

194196
The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values.
195197
By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values.
196-
If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins):
198+
If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins):
197199

198200
```cs
199201
// MessageBusBuilder mbb;
@@ -209,9 +211,30 @@ mbb
209211
210212
## Consumers
211213

214+
### Consumer context
215+
216+
The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:
217+
218+
```cs
219+
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
220+
{
221+
public IConsumerContext Context { get; set; }
222+
223+
public Task OnHandle(PingMessage message)
224+
{
225+
// SMB Kafka transport specific extension:
226+
var transportMessage = Context.GetTransportMessage();
227+
var partition = transportMessage.TopicPartition.Partition;
228+
}
229+
}
230+
```
231+
232+
This could be useful to extract the message's offset or partition.
233+
212234
### Offset Commit
213235

214-
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder:
236+
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
237+
is configuration is controlled through the following methods on the consumer builder:
215238

216239
- `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages.
217240
- `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval.

docs/provider_kafka.t.md

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ Please read the [Introduction](intro.md) before reading this provider documentat
66
- [Configuration properties](#configuration-properties)
77
- [Minimizing message latency](#minimizing-message-latency)
88
- [SSL and password authentication](#ssl-and-password-authentication)
9-
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
10-
- [Default partitioner with message key](#default-partitioner-with-message-key)
11-
- [Assigning partition explicitly](#assigning-partition-explicitly)
12-
- [Consumer context](#consumer-context)
9+
- [Producers](#producers)
10+
- [High throughput publish](#high-throughput-publish)
11+
- [Selecting message partition for topic producer](#selecting-message-partition-for-topic-producer)
12+
- [Default partitioner with message key](#default-partitioner-with-message-key)
13+
- [Assigning partition explicitly](#assigning-partition-explicitly)
1314
- [Message Headers](#message-headers)
1415
- [Consumers](#consumers)
16+
- [Consumer context](#consumer-context)
1517
- [Offset Commit](#offset-commit)
1618
- [Consumer Error Handling](#consumer-error-handling)
1719
- [Debugging](#debugging)
@@ -119,12 +121,24 @@ private static void AddSsl(string username, string password, ClientConfig c)
119121

120122
The file `cloudkarafka_2020-12.ca` has to be set to `Copy to Output Directory` as `Copy always`.
121123

122-
## Selecting message partition for topic producer
124+
## Producers
125+
126+
### High throughput publish
127+
128+
By default each [.Publish()](../src/SlimMessageBus/IPublishBus.cs) / [.Send()](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs) is producing the message to the Kafka transport and awaiting the response.
129+
This is to ensure the errors in delivery to the Kafka transport are reported as [ProducerMessageBusException](../src/SlimMessageBus/Exceptions/ProducerMessageBusException.cs) and ensuring delivery to the kafka cluster.
130+
131+
However, for scenarios where we want higher throughput with the sacrifice of delivery we can use the `.EnableProduceAwait(false)` on the producer or bus configuration.
132+
When await is disabled the message will be delivered to the Kafka client without awaiting the produce result, and the client's internal buffering will be used more effectively.
133+
134+
@[:cs](../src/Tests/SlimMessageBus.Host.Kafka.Test/KafkaMessageBusIt.cs,ExampleEnableProduceAwait)
135+
136+
### Selecting message partition for topic producer
123137

124138
Kafka topics are broken into partitions. The question is how does SMB Kafka choose the partition to assign the message?
125139
There are two possible options:
126140

127-
### Default partitioner with message key
141+
#### Default partitioner with message key
128142

129143
Currently, [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet) does not support custom partitioners (see [here](https://github.com/confluentinc/confluent-kafka-dotnet/issues/343)).
130144
The default partitioner is supported, which works in this way:
@@ -148,7 +162,7 @@ mbb
148162

149163
The key must be a `byte[]`.
150164

151-
### Assigning partition explicitly
165+
#### Assigning partition explicitly
152166

153167
SMB Kafka allows to set a provider (selector) that will assign the partition number for a given message and topic pair. Here is an example:
154168

@@ -167,33 +181,13 @@ mbb
167181

168182
With this approach your provider needs to know the number of partitions for a topic.
169183

170-
## Consumer context
171-
172-
The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:
173-
174-
```cs
175-
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
176-
{
177-
public IConsumerContext Context { get; set; }
178-
179-
public Task OnHandle(PingMessage message)
180-
{
181-
// SMB Kafka transport specific extension:
182-
var transportMessage = Context.GetTransportMessage();
183-
var partition = transportMessage.TopicPartition.Partition;
184-
}
185-
}
186-
```
187-
188-
This could be useful to extract the message's offset or partition.
189-
190184
## Message Headers
191185

192186
SMB uses headers to pass additional metadata information with the message. This includes the `MessageType` (of type `string`) or in the case of request/response messages the `RequestId` (of type `string`), `ReplyTo` (of type `string`) and `Expires` (of type `long`).
193187

194188
The Kafka message header values are natively binary (`byte[]`) in the underlying .NET client, as a result SMB needs to serialize the header values.
195189
By default the [DefaultKafkaHeaderSerializer](../src/SlimMessageBus.Host.Kafka/DefaultKafkaHeaderSerializer.cs) is used to serialize header values.
196-
If you need to specify a different serializer provide a specfic `IMessageSerializer` implementation (custom or one of the available serialization plugins):
190+
If you need to specify a different serializer provide a specific `IMessageSerializer` implementation (custom or one of the available serialization plugins):
197191

198192
```cs
199193
// MessageBusBuilder mbb;
@@ -209,6 +203,26 @@ mbb
209203
210204
## Consumers
211205

206+
### Consumer context
207+
208+
The consumer can implement the `IConsumerWithContext` interface to access the Kafka native message:
209+
210+
```cs
211+
public class PingConsumer : IConsumer<PingMessage>, IConsumerWithContext
212+
{
213+
public IConsumerContext Context { get; set; }
214+
215+
public Task OnHandle(PingMessage message)
216+
{
217+
// SMB Kafka transport specific extension:
218+
var transportMessage = Context.GetTransportMessage();
219+
var partition = transportMessage.TopicPartition.Partition;
220+
}
221+
}
222+
```
223+
224+
This could be useful to extract the message's offset or partition.
225+
212226
### Offset Commit
213227

214228
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>2.5.4-rc2</Version>
7+
<Version>2.6.0-rc1</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.Configuration/Builders/AbstractConsumerBuilder.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
namespace SlimMessageBus.Host;
22

3-
public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder
3+
public abstract class AbstractConsumerBuilder : IAbstractConsumerBuilder, IConsumerBuilder
44
{
55
public MessageBusSettings Settings { get; }
66

77
public ConsumerSettings ConsumerSettings { get; }
88

99
AbstractConsumerSettings IAbstractConsumerBuilder.ConsumerSettings => ConsumerSettings;
1010

11+
HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings;
12+
1113
protected AbstractConsumerBuilder(MessageBusSettings settings, Type messageType, string path = null)
1214
{
1315
Settings = settings ?? throw new ArgumentNullException(nameof(settings));

src/SlimMessageBus.Host.Configuration/Builders/ConsumerBuilder.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
namespace SlimMessageBus.Host;
22

3-
public class ConsumerBuilder<T> : AbstractConsumerBuilder
3+
public class ConsumerBuilder<T> : AbstractConsumerBuilder, IConsumerBuilder
44
{
5+
HasProviderExtensions IBuilderWithSettings.Settings => ConsumerSettings;
6+
57
public ConsumerBuilder(MessageBusSettings settings, Type messageType = null)
68
: base(settings, messageType ?? typeof(T))
79
{
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace SlimMessageBus.Host;
2+
3+
public interface IBuilderWithSettings
4+
{
5+
HasProviderExtensions Settings { get; }
6+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace SlimMessageBus.Host;
2+
3+
public interface IConsumerBuilder : IBuilderWithSettings
4+
{
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace SlimMessageBus.Host;
2+
3+
public interface IProducerBuilder : IBuilderWithSettings
4+
{
5+
}

src/SlimMessageBus.Host.Configuration/Builders/MessageBusBuilder.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace SlimMessageBus.Host;
22

33
using Microsoft.Extensions.DependencyInjection.Extensions;
44

5-
public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder
5+
public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBuilder, IProducerBuilder
66
{
77
/// <summary>
88
/// Parent bus builder.
@@ -17,15 +17,18 @@ public class MessageBusBuilder : IHasPostConfigurationActions, ISerializationBui
1717
/// <summary>
1818
/// The current settings that are being built.
1919
/// </summary>
20-
public MessageBusSettings Settings { get; private set; } = new();
20+
public MessageBusSettings Settings { get; private set; } = new();
21+
22+
HasProviderExtensions IBuilderWithSettings.Settings => Settings;
2123

2224
/// <summary>
2325
/// The bus factory method.
2426
/// </summary>
2527
public Func<MessageBusSettings, IMessageBusProvider> BusFactory { get; private set; }
2628

2729
public IList<Action<IServiceCollection>> PostConfigurationActions { get; } = [];
28-
30+
31+
2932
protected MessageBusBuilder()
3033
{
3134
}

src/SlimMessageBus.Host.Configuration/Builders/ProducerBuilder.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
namespace SlimMessageBus.Host;
22

3-
public class ProducerBuilder<T>
3+
public class ProducerBuilder<T> : IProducerBuilder
44
{
55
public ProducerSettings Settings { get; }
66

77
public Type MessageType => Settings.MessageType;
8-
8+
9+
HasProviderExtensions IBuilderWithSettings.Settings => Settings;
10+
911
public ProducerBuilder(ProducerSettings settings)
1012
: this(settings, typeof(T))
1113
{

0 commit comments

Comments
 (0)