Skip to content

WIP: [DEVEX-250] Add built-in auto-serialization #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f5ea2d7
Rename `EventStoreDBClientSettings` to `KurrentDBClientSettings`
w1am Dec 16, 2024
b50cefb
Rename `EventStoreDBClient` to `KurrentDBClient`
w1am Dec 16, 2024
aeebeab
Rename `EventStoreDBConnectionString`, `EventStoreDBPersistentSubscri…
w1am Dec 16, 2024
6aeff7d
Rename links and build.gradle properties that contains eventstore web…
w1am Dec 16, 2024
fa933c8
Rename EventStoreDB in comments and otel tags
w1am Dec 16, 2024
83823f6
Rename `com.eventstore` to `io.kurrent`
w1am Dec 16, 2024
a8fd3be
Rename test package to `io.kurrent`
w1am Dec 16, 2024
9184c3e
Remove `EVENTSTORE` prefix from environment variables and use `KURRENT`
w1am Dec 16, 2024
88f6db5
Rename proto java package
w1am Dec 16, 2024
a9c8f63
Some EventStoreDB renaming in otel and readme
w1am Dec 16, 2024
82d0c02
Rename gradle properties
w1am Dec 16, 2024
0e6c0fa
Add support for kdb and kurrent protocols
w1am Dec 18, 2024
37db418
Sync with upstream
w1am Mar 5, 2025
c90e936
feat: support kurrentdb protocol
w1am Mar 5, 2025
35246f3
fix: broken namespace
w1am Mar 5, 2025
847b7df
chore: set artifact id to `kurrentdb-client`
w1am Mar 5, 2025
53b6eb4
refactor: move ExpectedRevision to StreamState (#313)
w1am Mar 20, 2025
30a2c0f
Sync with upstream
w1am Mar 20, 2025
5104ce3
[DEVEX-250] Plugged the initial code for serialization settings
oskardudycz Mar 14, 2025
316bc7a
[DEVEX-250] Added default structure for message serializer and naming…
oskardudycz Mar 14, 2025
4dc914d
[DEVEX-250] Added MessageData as a future replacement for EventData t…
oskardudycz Mar 14, 2025
3a861b7
[DEVEX-250] Added the initial implementation of Jackson Serializer
oskardudycz Mar 18, 2025
7f05d10
[DEVEX-250] Added JacksonSerializer implementation together with Sche…
oskardudycz Mar 19, 2025
4d730b7
[DEVEX-250] Fixed mishap in mapping metadata instead of data in traci…
oskardudycz Mar 19, 2025
ce0858b
[DEVEX-250] Added Message deserialization
oskardudycz Mar 20, 2025
8c5a66a
[DEVEX-250] Reshaped the API to handle regular and wrapped messages, …
oskardudycz Mar 26, 2025
73163d4
Sync with upstream
oskardudycz Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CODE-OF-CONDUCT.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Contributor Covenant Code of Conduct

`KurrentDB-Client-Java` follows the widely-adopted Contributor Covenant Code of Conduct.
`Kurrent-Client-Java` follows the widely-adopted Contributor Covenant Code of Conduct.

## Our Pledge

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ dependencies {
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation "org.slf4j:slf4j-api:2.0.17"
implementation "org.bouncycastle:bcprov-jdk18on:1.80"
implementation "org.bouncycastle:bcpkix-jdk18on:1.80"
Expand All @@ -79,7 +80,6 @@ dependencies {
testImplementation "org.reactivestreams:reactive-streams-tck:${reactiveStreamsApiVersion}"
testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}"
testImplementation platform("com.fasterxml.jackson:jackson-bom:${jacksonVersion}")
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation "com.github.javafaker:javafaker:1.0.2"
testImplementation 'org.slf4j:slf4j-simple:2.0.17'
testImplementation "io.opentelemetry:opentelemetry-sdk"
Expand Down
18 changes: 11 additions & 7 deletions src/main/java/io/kurrent/dbclient/AbstractRead.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ abstract class AbstractRead implements Publisher<ReadMessage> {
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;

private final GrpcClient client;
private final OptionsWithBackPressure<?> options;
private final OptionsWithBackPressureAndSerialization<?> options;

protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {
protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
Expand All @@ -27,13 +27,17 @@ protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {

@Override
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber));
ReadResponseObserver observer = new ReadResponseObserver(
options,
new ReadStreamConsumer(subscriber),
this.client.getSerializer(options.serializationSettings().orElse(null))
);

this.client.getWorkItemArgs().whenComplete((args, error) -> {
if (error != null) {
observer.onError(error);
return;
}
if (error != null) {
observer.onError(error);
return;
}

StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
Expand Down
17 changes: 7 additions & 10 deletions src/main/java/io/kurrent/dbclient/AbstractRegularSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.proto.streams.StreamsGrpc;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.grpc.ManagedChannel;

import java.util.concurrent.CompletableFuture;
Expand All @@ -21,9 +14,9 @@ abstract class AbstractRegularSubscription {
protected SubscriptionListener listener;
protected Checkpointer checkpointer = null;
private final GrpcClient client;
private final OptionsWithBackPressure<?> options;
private final OptionsWithBackPressureAndSerialization<?> options;

protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure<?> options) {
protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
Expand Down Expand Up @@ -77,6 +70,10 @@ private ReadResponseObserver createObserver(ManagedChannel channel, CompletableF
}
);

return new ReadResponseObserver(this.options, consumer);
return new ReadResponseObserver(
this.options,
consumer,
this.client.getSerializer(options.serializationSettings().orElse(null))
);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.kurrent.dbclient;

import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
import io.kurrent.dbclient.proto.shared.Shared;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.serialization.MessageSerializer;

import java.util.concurrent.CompletableFuture;

Expand All @@ -18,20 +19,26 @@ abstract class AbstractSubscribePersistentSubscription {
private final String group;
private final PersistentSubscriptionListener listener;
private final SubscribePersistentSubscriptionOptions options;
private final MessageSerializer messageSerializer;

static {
defaultReadOptions = Persistent.ReadReq.Options.newBuilder()
.setUuidOption(Persistent.ReadReq.Options.UUIDOption.newBuilder()
.setStructured(Shared.Empty.getDefaultInstance()));
}

public AbstractSubscribePersistentSubscription(GrpcClient client, String group,
SubscribePersistentSubscriptionOptions options,
PersistentSubscriptionListener listener) {
public AbstractSubscribePersistentSubscription(
GrpcClient client,
String group,
SubscribePersistentSubscriptionOptions options,
PersistentSubscriptionListener listener,
MessageSerializer messageSerializer
) {
this.client = client;
this.group = group;
this.options = options;
this.listener = listener;
this.messageSerializer = messageSerializer;
}

protected abstract Persistent.ReadReq.Options.Builder createOptions();
Expand Down Expand Up @@ -91,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) {
int retryCount = readResp.getEvent().hasNoRetryCount() ? 0 : readResp.getEvent().getRetryCount();

try {
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent(), messageSerializer);
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, retryCount, resolvedEvent),
_subscription.getSubscriptionId(),
Expand Down
32 changes: 20 additions & 12 deletions src/main/java/io/kurrent/dbclient/AppendToStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
class AppendToStream {
private final GrpcClient client;
private final String streamName;
private final List<EventData> events;
private final StreamState streamState;
private final List<MessageData> events;
private final AppendToStreamOptions options;

public AppendToStream(GrpcClient client, String streamName, Iterator<EventData> events, AppendToStreamOptions options) {
public AppendToStream(
GrpcClient client,
String streamName,
StreamState streamState,
Iterator<MessageData> events,
AppendToStreamOptions options
) {
this.client = client;
this.streamName = streamName;
this.streamState = streamState;
this.events = new ArrayList<>();
while (events.hasNext()) {
this.events.add(events.next());
Expand All @@ -40,9 +48,9 @@ public CompletableFuture<WriteResult> execute() {
this.options.getCredentials()));
}

private CompletableFuture<WriteResult> append(ManagedChannel channel, List<EventData> events) {
private CompletableFuture<WriteResult> append(ManagedChannel channel, List<MessageData> events) {
CompletableFuture<WriteResult> result = new CompletableFuture<>();
StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
StreamsOuterClass.AppendReq.Options.Builder options = this.streamState.applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
.setStreamName(ByteString.copyFromUtf8(streamName))
.build()));
Expand Down Expand Up @@ -93,18 +101,18 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
try {
requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder().setOptions(options).build());

for (EventData e : events) {
for (MessageData e : events) {
StreamsOuterClass.AppendReq.ProposedMessage.Builder msgBuilder = StreamsOuterClass.AppendReq.ProposedMessage.newBuilder()
.setId(Shared.UUID.newBuilder()
.setStructured(Shared.UUID.Structured.newBuilder()
.setMostSignificantBits(e.getEventId().getMostSignificantBits())
.setLeastSignificantBits(e.getEventId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getEventData()))
.setMostSignificantBits(e.getMessageId().getMostSignificantBits())
.setLeastSignificantBits(e.getMessageId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getMessageData()))
.putMetadata(SystemMetadataKeys.CONTENT_TYPE, e.getContentType())
.putMetadata(SystemMetadataKeys.TYPE, e.getEventType());
.putMetadata(SystemMetadataKeys.TYPE, e.getMessageType());

if (e.getUserMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getUserMetadata()));
if (e.getMessageMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getMessageMetadata()));
}

requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder()
Expand All @@ -117,7 +125,7 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.parseInt(leaderPort));
result.completeExceptionally(reason);
} else {
result.completeExceptionally(e);
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/kurrent/dbclient/AppendToStreamOptions.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
package io.kurrent.dbclient;

import io.kurrent.dbclient.serialization.OperationSerializationSettings;

import java.util.Optional;

/**
* Options of the append stream request.
*/
public class AppendToStreamOptions extends OptionsWithStreamStateBase<AppendToStreamOptions> {
private OperationSerializationSettings serializationSettings = null;

private AppendToStreamOptions() {
}

/**
* Returns optional serialization settings
*/
public Optional<OperationSerializationSettings> serializationSettings() {
return Optional.ofNullable(this.serializationSettings);
}

/**
* Allows to customize or disable the automatic deserialization
*
* @param serializationSettings - expected revision.
* @return updated options.
*/
public AppendToStreamOptions serializationSettings(OperationSerializationSettings serializationSettings) {
this.serializationSettings = serializationSettings;
return this;
}

/**
* Returns options with default values.
*/
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/kurrent/dbclient/ClientTelemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ private static Tracer getTracer() {
ClientTelemetry.class.getPackage().getImplementationVersion());
}

private static List<EventData> tryInjectTracingContext(Span span, List<EventData> events) {
List<EventData> injectedEvents = new ArrayList<>();
for (EventData event : events) {
private static List<MessageData> tryInjectTracingContext(Span span, List<MessageData> events) {
List<MessageData> injectedEvents = new ArrayList<>();
for (MessageData event : events) {
boolean isJsonEvent = Objects.equals(event.getContentType(), ContentType.JSON);

injectedEvents.add(EventDataBuilder
.binary(event.getEventId(), event.getEventType(), event.getEventData(), isJsonEvent)
.metadataAsBytes(tryInjectTracingContext(span, event.getUserMetadata()))
.build());
injectedEvents.add(
MessageDataBuilder
.with(event.getMessageType(), event.getMessageData(), tryInjectTracingContext(span, event.getMessageMetadata()), event.getMessageId(), isJsonEvent)
.build());
}
return injectedEvents;
}
Expand Down Expand Up @@ -85,9 +85,9 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) {
}

static CompletableFuture<WriteResult> traceAppend(
BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation,
BiFunction<ManagedChannel, List<MessageData>, CompletableFuture<WriteResult>> appendOperation,
ManagedChannel channel,
List<EventData> events, String streamId, KurrentDBClientSettings settings,
List<MessageData> events, String streamId, KurrentDBClientSettings settings,
UserCredentials optionalCallCredentials) {
Span span = createSpan(
ClientTelemetryConstants.Operations.APPEND,
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/io/kurrent/dbclient/ConnectionSettingsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import io.grpc.ClientInterceptor;
import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -36,6 +37,7 @@ public class ConnectionSettingsBuilder {
private List<ClientInterceptor> _interceptors = new ArrayList<>();
private String _tlsCaFile = null;
private Set<String> _features = new HashSet<>();
private KurrentDBClientSerializationSettings _serializationSettings;

ConnectionSettingsBuilder() {}

Expand All @@ -60,7 +62,9 @@ public KurrentDBClientSettings buildConnectionSettings() {
_defaultDeadline,
_interceptors,
_tlsCaFile,
_features);
_features,
_serializationSettings
);
}

/**
Expand Down Expand Up @@ -241,6 +245,15 @@ public ConnectionSettingsBuilder feature(String feature) {
return this;
}

/**
* Provides configuration options for messages serialization and deserialization in the KurrentDB client.
* If null, default settings are used.
*/
public ConnectionSettingsBuilder serialization(KurrentDBClientSerializationSettings serializationSettings) {
this._serializationSettings = serializationSettings;
return this;
}

void parseGossipSeed(String host) {
String[] hostParts = host.split(":");

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/kurrent/dbclient/EventData.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,9 @@ public static EventDataBuilder builderAsBinary(String eventType, byte[] eventDat
public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, byte[] eventData) {
return EventDataBuilder.binary(eventId, eventType, eventData);
}

public MessageData toMessageData() {
return new MessageData(eventType, eventData, userMetadata, eventId, contentType);
}
}

12 changes: 11 additions & 1 deletion src/main/java/io/kurrent/dbclient/GrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.kurrent.dbclient.serialization.MessageSerializer;
import io.kurrent.dbclient.serialization.MessageSerializerBuilder;
import io.kurrent.dbclient.serialization.OperationSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,11 +22,14 @@ class GrpcClient {
private final AtomicBoolean closed;
private final LinkedBlockingQueue<Msg> queue;
private final KurrentDBClientSettings settings;
private final MessageSerializer serializer;

GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue<Msg> queue) {
this.settings = settings;
this.closed = closed;
this.queue = queue;

this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
}

public boolean isShutdown() {
Expand Down Expand Up @@ -101,7 +107,7 @@ public <A> CompletableFuture<A> runWithArgs(Function<WorkItemArgs, CompletableFu
logger.debug("RunWorkItem[{}] completed exceptionally: {}", args.getId(), e.toString());

if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw (RuntimeException) e;
else
throw new RuntimeException(e);
}
Expand All @@ -120,4 +126,8 @@ public CompletableFuture<Void> shutdown() {
public KurrentDBClientSettings getSettings() {
return this.settings;
}

public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) {
return this.serializer.with(serializationSettings);
}
}
Loading
Loading