Skip to content

[DEVEX-222] Add built-in auto-serialization #333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
f0e35bd
[DEVEX-222] Added configuration options for KurrentClientSettings Cre…
oskardudycz Jan 23, 2025
f8b77d2
[DEVEX-222] Added overloads of Append methods that take regular events
oskardudycz Jan 23, 2025
8e27549
[DEVEX-222] Added TryDeserialize method to ResolvedEvent
oskardudycz Jan 23, 2025
001f1a0
[DEVEX-222] Added first working, but not yet complete JSON built-in s…
oskardudycz Jan 23, 2025
9ea2329
[DEVEX-222] Added autoserialization settings and removed serializer f…
oskardudycz Jan 27, 2025
8ce4f42
[DEVEX-222] Made serialization settings to create specific serializers
oskardudycz Jan 28, 2025
cea88dc
[DEVEX-222] Added override of the serialization settings to Persisten…
oskardudycz Jan 28, 2025
4992546
[DEVEX-222] Added serialization type and merged serialization into Se…
oskardudycz Jan 29, 2025
1caba87
[DEVEX-222] Added message type name resolution strategy
oskardudycz Jan 29, 2025
ef604ff
[DEVEX-222] Refactored Event Type Mapper to not be responsible for re…
oskardudycz Jan 29, 2025
64c79e7
[DEVEX-222] Added automatic message clr type resolution
oskardudycz Jan 29, 2025
6db199d
[DEVEX-222] Added metadata extensions to allow injecting type informa…
oskardudycz Jan 29, 2025
e07b02a
[DEVEX-222] Made CLR type be resolved based on the message type metad…
oskardudycz Jan 29, 2025
786caf3
[DEVEX-222] Added the MessageTypeResolutionStrategyWrapper for cachin…
oskardudycz Jan 29, 2025
6446ea2
[DEVEX-222] Added MessageSerializationContext to pass additional info…
oskardudycz Jan 29, 2025
1e65b62
[DEVEX-222] Refactored SerializationContext into MessageSerializerWra…
oskardudycz Jan 29, 2025
b395583
[DEVEX-222] Added message type maps registration
oskardudycz Jan 29, 2025
85bf401
[DEVEX-222] Added helpers for custom type resolutions strategy
oskardudycz Jan 29, 2025
edde73e
[DEVEX-222] Replaced MessageSerializerWrapper usage with more generic…
oskardudycz Jan 29, 2025
2bbb89b
[DEVEX-222] Made Message to be record instead of Struct
oskardudycz Feb 6, 2025
c9f9305
[DEVEX-222] Refactored new AppendToStreamAsync method to have simplif…
oskardudycz Feb 6, 2025
d555cd0
[DEVEX-222] Refactored Reading events signatures to use options inste…
oskardudycz Feb 6, 2025
1c1cb09
[DEVEX-222] Renamed MessageTypeResolutionStrategy to MessageTypeNamin…
oskardudycz Feb 6, 2025
772de07
[DEVEX-222] Made old Read methods be wrapper of the new one, instead …
oskardudycz Feb 7, 2025
852e367
[DEVEX-222] Added serialization configuration overrides
oskardudycz Feb 7, 2025
d414d06
[DEVEX-222] Removed storing CLR type in metadata and moved it to mess…
oskardudycz Feb 7, 2025
861145e
[DEVEX-222] Added possibility to register types for category name
oskardudycz Feb 10, 2025
391d212
[DEVEX-222] Added SubscribeToAllOptions and Subscription Listener
oskardudycz Feb 10, 2025
e4f117c
[DEVEX-222] Addes SubscribeToStreamOptions and applied it accordingly…
oskardudycz Feb 10, 2025
0f2e42e
[DEVEX-222] Renamed DeserializedEvent to Message in ResolvedEvent
oskardudycz Feb 10, 2025
29034b0
[DEVEX-222] Added persistent subscription litener and capability to c…
oskardudycz Feb 11, 2025
723a2f6
[DEVEX-222] Extended Serialization tests for Appends and Reads
oskardudycz Feb 11, 2025
51fe4b6
[DEVEX-222] Added capability to automatically deserialize message met…
oskardudycz Feb 12, 2025
518499b
[DEVEX-222] For unknown reason the reverse Enumerable.Reverse wasn't …
oskardudycz Feb 12, 2025
35c822d
[DEVEX-222] Fixed type resolution to correclty find types by assembly
oskardudycz Feb 12, 2025
a9cfa84
[DEVEX-222] Added tests for serialization compatibility between manua…
oskardudycz Feb 12, 2025
04e0685
[DEVEX-222] Added tests for appends with autoserialization
oskardudycz Feb 12, 2025
4c73df6
Revert "[DEVEX-222] Added tests for appends with autoserialization"
oskardudycz Feb 13, 2025
8dca94a
[DEVEX-222] Extended serialization test suite
oskardudycz Feb 13, 2025
68582da
[DEVEX-222] Added tests for auto-serialization in Catch-up subscriptions
oskardudycz Feb 13, 2025
5e4f2a5
[DEVEX-222] Added tests for auto-serialization in persistent subscrip…
oskardudycz Feb 13, 2025
78d0f22
[DEVEX-222] Added Expected prefix to StreamState and StreamRevision i…
oskardudycz Feb 13, 2025
e8385b3
[DEVEX-222] Merged namespaces
oskardudycz Feb 21, 2025
f0a86fc
[DEVEX-222] Renamed NulloMessageSerializer to NullMesageSerializer
oskardudycz Feb 21, 2025
95f8546
[DEVEX-222] Added AppendToStream methods that has expected stream rev…
oskardudycz Feb 21, 2025
62cebca
[DEVEX-222] Added XML documentation for Serialization settings
oskardudycz Feb 25, 2025
e938afe
[DEVEX-222] Added Unit tests
oskardudycz Feb 25, 2025
9acba54
[DEVEX-222] Added tests for type resolution
oskardudycz Feb 26, 2025
1af1def
[DEVEX-222] Added SchemaRegistry tests
oskardudycz Feb 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Kurrent.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client", "src\Kurre
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.Common", "test\Kurrent.Client.Tests.Common\Kurrent.Client.Tests.Common.csproj", "{47BF715B-A0BF-4044-B335-717E56422550}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.NeverLoadedAssembly", "test\Kurrent.Client.Tests.NeverLoadedAssembly\Kurrent.Client.Tests.NeverLoadedAssembly.csproj", "{0AC8A7E9-6839-4B4C-B299-950C376DF71F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.ExternalAssembly", "test\Kurrent.Client.Tests.ExternalAssembly\Kurrent.Client.Tests.ExternalAssembly.csproj", "{829AF806-1144-408A-85FE-763835775086}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
Expand All @@ -34,10 +38,20 @@ Global
{47BF715B-A0BF-4044-B335-717E56422550}.Debug|x64.Build.0 = Debug|Any CPU
{47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.ActiveCfg = Release|Any CPU
{47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.Build.0 = Release|Any CPU
{0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Debug|x64.ActiveCfg = Debug|Any CPU
{0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Debug|x64.Build.0 = Debug|Any CPU
{0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Release|x64.ActiveCfg = Release|Any CPU
{0AC8A7E9-6839-4B4C-B299-950C376DF71F}.Release|x64.Build.0 = Release|Any CPU
{829AF806-1144-408A-85FE-763835775086}.Debug|x64.ActiveCfg = Debug|Any CPU
{829AF806-1144-408A-85FE-763835775086}.Debug|x64.Build.0 = Debug|Any CPU
{829AF806-1144-408A-85FE-763835775086}.Release|x64.ActiveCfg = Release|Any CPU
{829AF806-1144-408A-85FE-763835775086}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{762EECAA-122E-4B0C-BC50-5AA4F72CA4E0} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
{47BF715B-A0BF-4044-B335-717E56422550} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{0AC8A7E9-6839-4B4C-B299-950C376DF71F} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{829AF806-1144-408A-85FE-763835775086} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
EndGlobalSection
EndGlobal
360 changes: 360 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSerializationSettings.cs

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSettings.ConnectionString.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ public partial class KurrentClientSettings {
public static KurrentClientSettings Create(string connectionString) =>
ConnectionStringParser.Parse(connectionString);

/// <summary>
/// Creates client settings from a connection string with additional configuration
/// </summary>
/// <param name="connectionString"></param>
/// <param name="configure">allows you to make additional customization of client settings</param>
/// <returns></returns>
public static KurrentClientSettings Create(string connectionString, Action<KurrentClientSettings> configure) {
var settings = ConnectionStringParser.Parse(connectionString);

configure(settings);

return settings;
}

private static class ConnectionStringParser {
private const string SchemeSeparator = "://";
private const string UserInfoSeparator = "@";
Expand Down
6 changes: 6 additions & 0 deletions src/Kurrent.Client/Core/KurrentClientSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,11 @@ public partial class KurrentClientSettings {
/// The default deadline for calls. Will not be applied to reads or subscriptions.
/// </summary>
public TimeSpan? DefaultDeadline { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Provides configuration options for messages serialization and deserialization in the KurrentDB client.
/// If null, default settings are used.
/// </summary>
public KurrentClientSerializationSettings Serialization { get; set; } = KurrentClientSerializationSettings.Default();
}
}
57 changes: 54 additions & 3 deletions src/Kurrent.Client/Core/ResolvedEvent.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Kurrent.Client.Core.Serialization;

namespace EventStore.Client {
/// <summary>
/// A structure representing a single event or a resolved link event.
Expand All @@ -22,6 +24,23 @@ public readonly struct ResolvedEvent {
/// </summary>
public EventRecord OriginalEvent => Link ?? Event;

/// <summary>
/// Returns the deserialized message
/// It will be provided or equal to null, depending on the automatic deserialization settings you choose.
/// If it's null, you can use <see cref="OriginalEvent"/> to deserialize it manually.
/// </summary>
public readonly Message? Message;

/// <summary>
/// Returns the deserialized message data.
/// </summary>
public object? DeserializedData => Message?.Data;

/// <summary>
/// Returns the deserialized message metadata.
/// </summary>
public object? DeserializedMetadata => Message?.Metadata;

/// <summary>
/// Position of the <see cref="OriginalEvent"/> if available.
/// </summary>
Expand Down Expand Up @@ -49,12 +68,44 @@ public readonly struct ResolvedEvent {
/// <param name="event"></param>
/// <param name="link"></param>
/// <param name="commitPosition"></param>
public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) {
Event = @event;
Link = link;
public ResolvedEvent(EventRecord @event, EventRecord? link, ulong? commitPosition) : this(
@event,
link,
null,
commitPosition
) { }

/// <summary>
/// Constructs a new <see cref="ResolvedEvent"/>.
/// </summary>
/// <param name="event"></param>
/// <param name="link"></param>
/// <param name="message"></param>
/// <param name="commitPosition"></param>
ResolvedEvent(
EventRecord @event,
EventRecord? link,
Message? message,
ulong? commitPosition
) {
Event = @event;
Link = link;
Message = message;
OriginalPosition = commitPosition.HasValue
? new Position(commitPosition.Value, (link ?? @event).Position.PreparePosition)
: new Position?();
}

internal static ResolvedEvent From(
EventRecord @event,
EventRecord? link,
ulong? commitPosition,
IMessageSerializer messageSerializer
) {
var originalEvent = link ?? @event;
return messageSerializer.TryDeserialize(originalEvent, out var message)
? new ResolvedEvent(@event, link, message, commitPosition)
: new ResolvedEvent(@event, link, commitPosition);
}
}
}
30 changes: 30 additions & 0 deletions src/Kurrent.Client/Core/Serialization/ISerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace Kurrent.Client.Core.Serialization;

/// <summary>
/// Defines the core serialization capabilities required by the KurrentDB client.
/// Implementations of this interface handle the conversion between .NET objects and their
/// binary representation for storage in and retrieval from the event store.
/// <br />
/// The client ships default System.Text.Json implementation, but custom implementations can be provided or other formats.
/// </summary>
public interface ISerializer {
/// <summary>
/// Converts a .NET object to its binary representation for storage in the event store.
/// </summary>
/// <param name="value">The object to serialize. This could be an event, command, or metadata object.</param>
/// <returns>
/// A binary representation of the object that can be stored in KurrentDB.
/// </returns>
public ReadOnlyMemory<byte> Serialize(object value);

/// <summary>
/// Reconstructs a .NET object from its binary representation retrieved from the event store.
/// </summary>
/// <param name="data">The binary data to deserialize, typically retrieved from a KurrentDB event.</param>
/// <param name="type">The target .NET type to deserialize the data into, determined from message type mappings.</param>
/// <returns>
/// The deserialized object cast to the specified type, or null if the data cannot be deserialized.
/// The returned object will be an instance of the specified type or a compatible subtype.
/// </returns>
public object? Deserialize(ReadOnlyMemory<byte> data, Type type);
}
62 changes: 62 additions & 0 deletions src/Kurrent.Client/Core/Serialization/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using EventStore.Client;

namespace Kurrent.Client.Core.Serialization;

/// <summary>
/// Represents a message wrapper in the KurrentDB system, containing both domain data and optional metadata.
/// Messages can represent events, commands, or other domain objects along with their associated metadata.
/// </summary>
/// <param name="Data">The message domain data.</param>
/// <param name="Metadata">Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.</param>
/// <param name="MessageId">Unique identifier for this specific message instance. When null, the system will auto-generate an ID.</param>
public record Message(object Data, object? Metadata, Uuid? MessageId = null) {
/// <summary>
/// Creates a new Message with the specified domain data and message ID, but without metadata.
/// This factory method is a convenient shorthand when working with systems that don't require metadata.
/// </summary>
/// <param name="data">The message domain data.</param>
/// <param name="messageId">Unique identifier for this message instance. Must not be Uuid.Empty.</param>
/// <returns>A new immutable Message instance containing the provided data and ID with null metadata.</returns>
/// <example>
/// <code>
/// // Create a message with a specific ID
/// var userCreated = new UserCreated { Id = "123", Name = "Alice" };
/// var messageId = Uuid.NewUuid();
/// var message = Message.From(userCreated, messageId);
/// </code>
/// </example>
public static Message From(object data, Uuid messageId) =>
From(data, null, messageId);

/// <summary>
/// Creates a new Message with the specified domain data and message ID and metadata.
/// </summary>
/// <param name="data">The message domain data.</param>
/// <param name="metadata">Optional metadata providing additional context about the message, such as correlation IDs, timestamps, or user information.</param>
/// <param name="messageId">Unique identifier for this specific message instance. </param>
/// <returns>A new immutable Message instance with the specified properties.</returns>
/// <exception cref="ArgumentOutOfRangeException">Thrown when messageId is explicitly set to Uuid.Empty, which is an invalid identifier.</exception>
/// <example>
/// <code>
/// // Create a message with data and metadata
/// var orderPlaced = new OrderPlaced { OrderId = "ORD-123", Amount = 99.99m };
/// var metadata = new EventMetadata {
/// UserId = "user-456",
/// Timestamp = DateTimeOffset.UtcNow,
/// CorrelationId = correlationId
/// };
///
/// // Let the system assign an ID automatically
/// var message = Message.From(orderPlaced, metadata);
///
/// // Or specify a custom ID
/// var messageWithId = Message.From(orderPlaced, metadata, Uuid.NewUuid());
/// </code>
/// </example>
public static Message From(object data, object? metadata = null, Uuid? messageId = null) {
if (messageId == Uuid.Empty)
throw new ArgumentOutOfRangeException(nameof(messageId), "Message ID cannot be an empty UUID.");

return new Message(data, metadata, messageId);
}
}
148 changes: 148 additions & 0 deletions src/Kurrent.Client/Core/Serialization/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
using System.Diagnostics.CodeAnalysis;
using EventStore.Client;

namespace Kurrent.Client.Core.Serialization;

using static ContentTypeExtensions;

interface IMessageSerializer {
public EventData Serialize(Message value, MessageSerializationContext context);

#if NET48
public bool TryDeserialize(EventRecord record, out Message? deserialized);
#else
public bool TryDeserialize(EventRecord record, [NotNullWhen(true)] out Message? deserialized);
#endif
}

record MessageSerializationContext(
string StreamName,
ContentType ContentType
) {
public string CategoryName =>
StreamName.Split('-').FirstOrDefault() ?? "no_stream_category";
}

static class MessageSerializerExtensions {
public static EventData[] Serialize(
this IMessageSerializer serializer,
IEnumerable<Message> messages,
MessageSerializationContext context
) {
return messages.Select(m => serializer.Serialize(m, context)).ToArray();
}

public static IMessageSerializer With(
this IMessageSerializer defaultMessageSerializer,
KurrentClientSerializationSettings defaultSettings,
OperationSerializationSettings? operationSettings
) {
if (operationSettings == null)
return defaultMessageSerializer;

if (operationSettings.AutomaticDeserialization == AutomaticDeserialization.Disabled)
return NullMessageSerializer.Instance;

if (operationSettings.ConfigureSettings == null)
return defaultMessageSerializer;

var settings = defaultSettings.Clone();
operationSettings.ConfigureSettings.Invoke(settings);

return new MessageSerializer(SchemaRegistry.From(settings));
}
}

class MessageSerializer(SchemaRegistry schemaRegistry) : IMessageSerializer {
readonly ISerializer _jsonSerializer =
schemaRegistry.GetSerializer(ContentType.Json);

readonly IMessageTypeNamingStrategy _messageTypeNamingStrategy =
schemaRegistry.MessageTypeNamingStrategy;

public EventData Serialize(Message message, MessageSerializationContext serializationContext) {
var (data, metadata, eventId) = message;

var eventType = _messageTypeNamingStrategy
.ResolveTypeName(
message.Data.GetType(),
new MessageTypeNamingResolutionContext(serializationContext.CategoryName)
);

var serializedData = schemaRegistry
.GetSerializer(serializationContext.ContentType)
.Serialize(data);

var serializedMetadata = metadata != null
? _jsonSerializer.Serialize(metadata)
: ReadOnlyMemory<byte>.Empty;

return new EventData(
eventId ?? Uuid.NewUuid(),
eventType,
serializedData,
serializedMetadata,
serializationContext.ContentType.ToMessageContentType()
);
}

#if NET48
public bool TryDeserialize(EventRecord record, out Message? deserialized) {
#else
public bool TryDeserialize(EventRecord record, [NotNullWhen(true)] out Message? deserialized) {
#endif
if (!TryResolveClrType(record, out var clrType)) {
deserialized = null;
return false;
}

var data = schemaRegistry
.GetSerializer(FromMessageContentType(record.ContentType))
.Deserialize(record.Data, clrType!);

if (data == null) {
deserialized = null;
return false;
}

object? metadata = record.Metadata.Length > 0 && TryResolveClrMetadataType(record, out var clrMetadataType)
? _jsonSerializer.Deserialize(record.Metadata, clrMetadataType!)
: null;

deserialized = Message.From(data, metadata, record.EventId);
return true;
}

public static MessageSerializer From(KurrentClientSerializationSettings? settings = null) {
settings ??= KurrentClientSerializationSettings.Default();

return new MessageSerializer(SchemaRegistry.From(settings));
}

bool TryResolveClrType(EventRecord record, out Type? clrType) =>
schemaRegistry
.MessageTypeNamingStrategy
.TryResolveClrType(record.EventType, out clrType);

bool TryResolveClrMetadataType(EventRecord record, out Type? clrMetadataType) =>
schemaRegistry
.MessageTypeNamingStrategy
.TryResolveClrMetadataType(record.EventType, out clrMetadataType);
}

class NullMessageSerializer : IMessageSerializer {
public static readonly NullMessageSerializer Instance = new NullMessageSerializer();

public EventData Serialize(Message value, MessageSerializationContext context) {
throw new InvalidOperationException("Cannot serialize, automatic deserialization is disabled");
}

#if NET48
public bool TryDeserialize(EventRecord record, out Message? deserialized) {
#else
public bool TryDeserialize(EventRecord eventRecord, [NotNullWhen(true)] out Message? deserialized) {
#endif
deserialized = null;
return false;
}
}
Loading