Skip to content

Commit 8c7d67a

Browse files
committed
[DEVEX-227] Moved code responsible for GettingState to a dedicated file
1 parent f983649 commit 8c7d67a

File tree

3 files changed

+177
-175
lines changed

3 files changed

+177
-175
lines changed

src/Kurrent.Client/Streams/DecisionMaking/AggregateStore.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ Task<IWriteResult> HandleAsync(
3333
);
3434
}
3535

36+
public interface IAggregateStore<TAggregate> : IAggregateStore<TAggregate, object>
37+
where TAggregate : IAggregate<object>;
38+
3639
public static class AggregateStoreExtensions {
3740
public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
3841
this IAggregateStore<TAggregate, TEvent> aggregateStore,
@@ -94,9 +97,6 @@ public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
9497
);
9598
}
9699

97-
public interface IAggregateStore<TAggregate> : IAggregateStore<TAggregate, object>
98-
where TAggregate : IAggregate<object>;
99-
100100
public class AggregateStoreOptions<TState> where TState : notnull {
101101
#if NET48
102102
public IStateBuilder<TState> StateBuilder { get; set; } = null!;
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
using EventStore.Client;
2+
3+
namespace Kurrent.Client.Streams.GettingState;
4+
5+
public record GetStateOptions<TState> where TState : notnull {
6+
public StateAtPointInTime<TState>? CurrentState { get; set; }
7+
}
8+
9+
public class GetStreamStateOptions<TState> : ReadStreamOptions where TState : notnull {
10+
public GetSnapshot<TState>? GetSnapshot { get; set; }
11+
}
12+
13+
public delegate ValueTask<StateAtPointInTime<TState>> GetSnapshot<TState>(
14+
GetSnapshotOptions options,
15+
CancellationToken ct = default
16+
) where TState : notnull;
17+
18+
public record GetSnapshotOptions {
19+
public string? StreamName { get; set; }
20+
21+
public string? SnapshotVersion { get; set; }
22+
23+
public static GetSnapshotOptions ForStream(string streamName) =>
24+
new GetSnapshotOptions { StreamName = streamName };
25+
26+
public static GetSnapshotOptions ForAll() =>
27+
new GetSnapshotOptions();
28+
}
29+
30+
public static class KurrentClientGettingStateClientExtensions {
31+
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
32+
this KurrentClient eventStore,
33+
string streamName,
34+
IStateBuilder<TState> stateBuilder,
35+
GetStreamStateOptions<TState>? options,
36+
CancellationToken ct = default
37+
) where TState : notnull {
38+
StateAtPointInTime<TState>? stateAtPointInTime = null;
39+
40+
options ??= new GetStreamStateOptions<TState>();
41+
42+
if (options.GetSnapshot != null)
43+
stateAtPointInTime = await options.GetSnapshot(
44+
GetSnapshotOptions.ForStream(streamName),
45+
ct
46+
);
47+
48+
options.StreamPosition = stateAtPointInTime?.LastStreamPosition ?? StreamPosition.Start;
49+
50+
return await eventStore.ReadStreamAsync(streamName, options, ct)
51+
.GetStateAsync(stateBuilder, ct);
52+
}
53+
54+
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
55+
this IAsyncEnumerable<ResolvedEvent> messages,
56+
TState initialState,
57+
Func<TState, ResolvedEvent, TState> evolve,
58+
CancellationToken ct
59+
) where TState : notnull {
60+
var state = initialState;
61+
62+
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
63+
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound)
64+
return new StateAtPointInTime<TState>(state);
65+
}
66+
67+
ResolvedEvent? lastEvent = null;
68+
69+
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
70+
lastEvent = resolvedEvent;
71+
72+
state = evolve(state, resolvedEvent);
73+
}
74+
75+
return new StateAtPointInTime<TState>(state, lastEvent?.Event.EventNumber, lastEvent?.Event.Position);
76+
}
77+
78+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
79+
this KurrentClient eventStore,
80+
string streamName,
81+
IStateBuilder<TState> streamStateBuilder,
82+
CancellationToken ct = default
83+
) where TState : notnull =>
84+
eventStore.GetStateAsync(streamName, streamStateBuilder, new GetStreamStateOptions<TState>(), ct);
85+
86+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState, TEvent>(
87+
this KurrentClient eventStore,
88+
string streamName,
89+
GetStreamStateOptions<TState> options,
90+
CancellationToken ct = default
91+
) where TState : IState<TEvent>, new() =>
92+
eventStore.GetStateAsync(
93+
streamName,
94+
StateBuilder.For<TState, TEvent>(),
95+
options,
96+
ct
97+
);
98+
99+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState, TEvent>(
100+
this KurrentClient eventStore,
101+
string streamName,
102+
CancellationToken ct = default
103+
) where TState : IState<TEvent>, new() =>
104+
eventStore.GetStateAsync<TState, TEvent>(streamName, new GetStreamStateOptions<TState>(), ct);
105+
106+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
107+
this KurrentClient eventStore,
108+
string streamName,
109+
CancellationToken ct = default
110+
) where TState : IState<object>, new() =>
111+
eventStore.GetStateAsync<TState, object>(streamName, new GetStreamStateOptions<TState>(), ct);
112+
}
113+
114+
public static class KurrentClientGettingStateReadAndSubscribeExtensions {
115+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
116+
this KurrentClient.ReadStreamResult readStreamResult,
117+
IStateBuilder<TState> stateBuilder,
118+
GetStateOptions<TState> options,
119+
CancellationToken ct = default
120+
) where TState : notnull =>
121+
stateBuilder.GetAsync(readStreamResult, options, ct);
122+
123+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
124+
this KurrentClient.ReadStreamResult readStreamResult,
125+
IStateBuilder<TState> stateBuilder,
126+
CancellationToken ct = default
127+
) where TState : notnull =>
128+
stateBuilder.GetAsync(readStreamResult, new GetStateOptions<TState>(), ct);
129+
130+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
131+
this KurrentClient.ReadAllStreamResult readAllStreamResult,
132+
IStateBuilder<TState> stateBuilder,
133+
GetStateOptions<TState> options,
134+
CancellationToken ct = default
135+
) where TState : notnull =>
136+
stateBuilder.GetAsync(readAllStreamResult, options, ct);
137+
138+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
139+
this KurrentClient.ReadAllStreamResult readAllStreamResult,
140+
IStateBuilder<TState> stateBuilder,
141+
CancellationToken ct = default
142+
) where TState : notnull =>
143+
stateBuilder.GetAsync(readAllStreamResult, new GetStateOptions<TState>(), ct);
144+
145+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
146+
this KurrentClient.StreamSubscriptionResult subscriptionResult,
147+
IStateBuilder<TState> stateBuilder,
148+
GetStateOptions<TState> options,
149+
CancellationToken ct = default
150+
) where TState : notnull =>
151+
stateBuilder.GetAsync(subscriptionResult, options, ct);
152+
153+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
154+
this KurrentClient.StreamSubscriptionResult subscriptionResult,
155+
IStateBuilder<TState> stateBuilder,
156+
CancellationToken ct = default
157+
) where TState : notnull =>
158+
stateBuilder.GetAsync(subscriptionResult, new GetStateOptions<TState>(), ct);
159+
160+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
161+
this KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult subscriptionResult,
162+
IStateBuilder<TState> stateBuilder,
163+
GetStateOptions<TState> options,
164+
CancellationToken ct = default
165+
) where TState : notnull =>
166+
stateBuilder.GetAsync(subscriptionResult, options, ct);
167+
168+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
169+
this KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult subscriptionResult,
170+
IStateBuilder<TState> stateBuilder,
171+
CancellationToken ct = default
172+
) where TState : notnull =>
173+
stateBuilder.GetAsync(subscriptionResult, new GetStateOptions<TState>(), ct);
174+
}
Lines changed: 0 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Runtime.CompilerServices;
21
using EventStore.Client;
32
using Kurrent.Client.Core.Serialization;
43

@@ -10,10 +9,6 @@ public record StateAtPointInTime<TState>(
109
Position? LastPosition = null
1110
) where TState : notnull;
1211

13-
public record GetStateOptions<TState> where TState : notnull {
14-
public StateAtPointInTime<TState>? CurrentState { get; set; }
15-
}
16-
1712
public interface IStateBuilder<TState> where TState : notnull {
1813
public Task<StateAtPointInTime<TState>> GetAsync(
1914
IAsyncEnumerable<ResolvedEvent> messages,
@@ -28,23 +23,6 @@ public interface IState<in TEvent> {
2823
public void Apply(TEvent @event);
2924
}
3025

31-
public record GetSnapshotOptions {
32-
public string? StreamName { get; set; }
33-
34-
public string? SnapshotVersion { get; set; }
35-
36-
public static GetSnapshotOptions ForStream(string streamName) =>
37-
new GetSnapshotOptions { StreamName = streamName };
38-
39-
public static GetSnapshotOptions ForAll() =>
40-
new GetSnapshotOptions();
41-
}
42-
43-
public delegate ValueTask<StateAtPointInTime<TState>> GetSnapshot<TState>(
44-
GetSnapshotOptions options,
45-
CancellationToken ct = default
46-
) where TState : notnull;
47-
4826
public record StateBuilder<TState>(
4927
Func<TState, ResolvedEvent, TState> Evolve,
5028
Func<TState> GetInitialState
@@ -144,153 +122,3 @@ public static StateBuilder<TState> For<TState>(Func<TState> getInitialState)
144122
getInitialState
145123
);
146124
}
147-
148-
public class GetStreamStateOptions<TState> : ReadStreamOptions where TState : notnull {
149-
public GetSnapshot<TState>? GetSnapshot { get; set; }
150-
}
151-
152-
public static class KurrentClientGettingStateClientExtensions {
153-
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
154-
this KurrentClient eventStore,
155-
string streamName,
156-
IStateBuilder<TState> stateBuilder,
157-
GetStreamStateOptions<TState>? options,
158-
CancellationToken ct = default
159-
) where TState : notnull {
160-
StateAtPointInTime<TState>? stateAtPointInTime = null;
161-
162-
options ??= new GetStreamStateOptions<TState>();
163-
164-
if (options.GetSnapshot != null)
165-
stateAtPointInTime = await options.GetSnapshot(
166-
GetSnapshotOptions.ForStream(streamName),
167-
ct
168-
);
169-
170-
options.StreamPosition = stateAtPointInTime?.LastStreamPosition ?? StreamPosition.Start;
171-
172-
return await eventStore.ReadStreamAsync(streamName, options, ct)
173-
.GetStateAsync(stateBuilder, ct);
174-
}
175-
176-
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
177-
this IAsyncEnumerable<ResolvedEvent> messages,
178-
TState initialState,
179-
Func<TState, ResolvedEvent, TState> evolve,
180-
CancellationToken ct
181-
) where TState : notnull {
182-
var state = initialState;
183-
184-
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
185-
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound)
186-
return new StateAtPointInTime<TState>(state);
187-
}
188-
189-
ResolvedEvent? lastEvent = null;
190-
191-
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
192-
lastEvent = resolvedEvent;
193-
194-
state = evolve(state, resolvedEvent);
195-
}
196-
197-
return new StateAtPointInTime<TState>(state, lastEvent?.Event.EventNumber, lastEvent?.Event.Position);
198-
}
199-
200-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
201-
this KurrentClient eventStore,
202-
string streamName,
203-
IStateBuilder<TState> streamStateBuilder,
204-
CancellationToken ct = default
205-
) where TState : notnull =>
206-
eventStore.GetStateAsync(streamName, streamStateBuilder, new GetStreamStateOptions<TState>(), ct);
207-
208-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState, TEvent>(
209-
this KurrentClient eventStore,
210-
string streamName,
211-
GetStreamStateOptions<TState> options,
212-
CancellationToken ct = default
213-
) where TState : IState<TEvent>, new() =>
214-
eventStore.GetStateAsync(
215-
streamName,
216-
StateBuilder.For<TState, TEvent>(),
217-
options,
218-
ct
219-
);
220-
221-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState, TEvent>(
222-
this KurrentClient eventStore,
223-
string streamName,
224-
CancellationToken ct = default
225-
) where TState : IState<TEvent>, new() =>
226-
eventStore.GetStateAsync<TState, TEvent>(streamName, new GetStreamStateOptions<TState>(), ct);
227-
228-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
229-
this KurrentClient eventStore,
230-
string streamName,
231-
CancellationToken ct = default
232-
) where TState : IState<object>, new() =>
233-
eventStore.GetStateAsync<TState, object>(streamName, new GetStreamStateOptions<TState>(), ct);
234-
}
235-
236-
public static class KurrentClientGettingStateReadAndSubscribeExtensions {
237-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
238-
this KurrentClient.ReadStreamResult readStreamResult,
239-
IStateBuilder<TState> stateBuilder,
240-
GetStateOptions<TState> options,
241-
CancellationToken ct = default
242-
) where TState : notnull =>
243-
stateBuilder.GetAsync(readStreamResult, options, ct);
244-
245-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
246-
this KurrentClient.ReadStreamResult readStreamResult,
247-
IStateBuilder<TState> stateBuilder,
248-
CancellationToken ct = default
249-
) where TState : notnull =>
250-
stateBuilder.GetAsync(readStreamResult, new GetStateOptions<TState>(), ct);
251-
252-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
253-
this KurrentClient.ReadAllStreamResult readAllStreamResult,
254-
IStateBuilder<TState> stateBuilder,
255-
GetStateOptions<TState> options,
256-
CancellationToken ct = default
257-
) where TState : notnull =>
258-
stateBuilder.GetAsync(readAllStreamResult, options, ct);
259-
260-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
261-
this KurrentClient.ReadAllStreamResult readAllStreamResult,
262-
IStateBuilder<TState> stateBuilder,
263-
CancellationToken ct = default
264-
) where TState : notnull =>
265-
stateBuilder.GetAsync(readAllStreamResult, new GetStateOptions<TState>(), ct);
266-
267-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
268-
this KurrentClient.StreamSubscriptionResult subscriptionResult,
269-
IStateBuilder<TState> stateBuilder,
270-
GetStateOptions<TState> options,
271-
CancellationToken ct = default
272-
) where TState : notnull =>
273-
stateBuilder.GetAsync(subscriptionResult, options, ct);
274-
275-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
276-
this KurrentClient.StreamSubscriptionResult subscriptionResult,
277-
IStateBuilder<TState> stateBuilder,
278-
CancellationToken ct = default
279-
) where TState : notnull =>
280-
stateBuilder.GetAsync(subscriptionResult, new GetStateOptions<TState>(), ct);
281-
282-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
283-
this KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult subscriptionResult,
284-
IStateBuilder<TState> stateBuilder,
285-
GetStateOptions<TState> options,
286-
CancellationToken ct = default
287-
) where TState : notnull =>
288-
stateBuilder.GetAsync(subscriptionResult, options, ct);
289-
290-
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
291-
this KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult subscriptionResult,
292-
IStateBuilder<TState> stateBuilder,
293-
CancellationToken ct = default
294-
) where TState : notnull =>
295-
stateBuilder.GetAsync(subscriptionResult, new GetStateOptions<TState>(), ct);
296-
}

0 commit comments

Comments
 (0)