Skip to content

Commit a8a4f4a

Browse files
committed
[DEVEX-227] Added example of Decider
1 parent 1f6f829 commit a8a4f4a

File tree

3 files changed

+265
-169
lines changed

3 files changed

+265
-169
lines changed

src/Kurrent.Client/Streams/DecisionMaking/Decide.cs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ public static Task<IWriteResult> DecideAsync<TState>(
2424
DecideRetryPolicy(options).ExecuteAsync(
2525
async ct => {
2626
var (state, streamPosition, position) =
27-
await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct);
27+
await eventStore.GetStateAsync(streamName, stateBuilder, options?.GetStateOptions, ct)
28+
.ConfigureAwait(false);
2829

29-
var messages = await decide(state, ct);
30+
var messages = await decide(state, ct).ConfigureAwait(false);
3031

3132
if (messages.Length == 0) {
3233
return new SuccessResult(
@@ -50,17 +51,49 @@ public static Task<IWriteResult> DecideAsync<TState>(
5051
messages,
5152
appendToStreamOptions,
5253
ct
53-
);
54+
).ConfigureAwait(false);
5455
},
5556
cancellationToken
5657
);
5758

59+
public static Task<IWriteResult> DecideAsync<TState, TCommand, TEvent>(
60+
this KurrentClient eventStore,
61+
string streamName,
62+
TCommand command,
63+
Decider<TState, TCommand, TEvent> decider,
64+
CancellationToken ct = default
65+
) where TState : notnull
66+
where TEvent : notnull =>
67+
eventStore.DecideAsync(
68+
streamName,
69+
command,
70+
decider.ToAsyncDecider(),
71+
ct
72+
);
73+
74+
public static Task<IWriteResult> DecideAsync<TState, TCommand, TEvent>(
75+
this KurrentClient eventStore,
76+
string streamName,
77+
TCommand command,
78+
Decider<TState, TCommand, TEvent> decider,
79+
DecideOptions<TState>? options,
80+
CancellationToken ct = default
81+
) where TState : notnull
82+
where TEvent : notnull =>
83+
eventStore.DecideAsync(
84+
streamName,
85+
command,
86+
decider.ToAsyncDecider(),
87+
options,
88+
ct
89+
);
90+
5891
public static Task<IWriteResult> DecideAsync<TState, TCommand>(
5992
this KurrentClient eventStore,
6093
string streamName,
6194
TCommand command,
6295
Decider<TState, TCommand> decider,
63-
CancellationToken ct
96+
CancellationToken ct = default
6497
) where TState : notnull =>
6598
eventStore.DecideAsync(
6699
streamName,
@@ -75,7 +108,7 @@ public static Task<IWriteResult> DecideAsync<TState, TCommand>(
75108
TCommand command,
76109
Decider<TState, TCommand> decider,
77110
DecideOptions<TState>? options,
78-
CancellationToken ct
111+
CancellationToken ct = default
79112
) where TState : notnull =>
80113
eventStore.DecideAsync(
81114
streamName,
@@ -90,7 +123,7 @@ public static Task<IWriteResult> DecideAsync<TState, TCommand>(
90123
string streamName,
91124
TCommand command,
92125
AsyncDecider<TState, TCommand> asyncDecider,
93-
CancellationToken ct
126+
CancellationToken ct = default
94127
) where TState : notnull =>
95128
eventStore.DecideAsync(
96129
streamName,
@@ -105,7 +138,7 @@ public static Task<IWriteResult> DecideAsync<TState, TCommand>(
105138
TCommand command,
106139
AsyncDecider<TState, TCommand> asyncDecider,
107140
DecideOptions<TState>? options,
108-
CancellationToken ct
141+
CancellationToken ct = default
109142
) where TState : notnull =>
110143
eventStore.DecideAsync(
111144
streamName,
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
using System.Collections.Immutable;
2+
using EventStore.Client;
3+
using Kurrent.Client.Streams.DecisionMaking;
4+
using Kurrent.Client.Streams.GettingState;
5+
6+
namespace Kurrent.Client.Tests.Streams.DecisionMaking.UnionTypes;
7+
8+
using static ShoppingCart;
9+
using static ShoppingCart.Event;
10+
using static ShoppingCart.Command;
11+
12+
[Trait("Category", "Target:Streams")]
13+
[Trait("Category", "Operation:Decide")]
14+
public class DecisionMakingWithDeciderTests(ITestOutputHelper output, KurrentPermanentFixture fixture)
15+
: KurrentPermanentTests<KurrentPermanentFixture>(output, fixture) {
16+
[RetryFact]
17+
public async Task runs_business_logic_with_decider_and_typed_events() {
18+
// Given
19+
var shoppingCartId = Guid.NewGuid();
20+
var clientId = Guid.NewGuid();
21+
var shoesId = Guid.NewGuid();
22+
var tShirtId = Guid.NewGuid();
23+
var twoPairsOfShoes = new PricedProductItem(shoesId, 2, 100);
24+
var pairOfShoes = new PricedProductItem(shoesId, 1, 100);
25+
var tShirt = new PricedProductItem(tShirtId, 1, 50);
26+
27+
var events = new Event[] {
28+
new Opened(clientId, DateTime.UtcNow),
29+
new ProductItemAdded(twoPairsOfShoes, DateTime.UtcNow),
30+
new ProductItemAdded(tShirt, DateTime.UtcNow),
31+
new ProductItemRemoved(pairOfShoes, DateTime.UtcNow),
32+
new Confirmed(DateTime.UtcNow),
33+
new Canceled(DateTime.UtcNow)
34+
};
35+
36+
var streamName = $"shopping_cart-{shoppingCartId}";
37+
38+
var result = await Fixture.Streams.DecideAsync(
39+
streamName,
40+
new Open(clientId, DateTime.UtcNow),
41+
Decider
42+
);
43+
44+
Assert.IsType<SuccessResult>(result);
45+
46+
result = await Fixture.Streams.DecideAsync(
47+
streamName,
48+
new AddProductItem(twoPairsOfShoes, DateTime.UtcNow),
49+
Decider
50+
);
51+
52+
Assert.IsType<SuccessResult>(result);
53+
54+
result = await Fixture.Streams.DecideAsync(
55+
streamName,
56+
new AddProductItem(tShirt, DateTime.UtcNow),
57+
Decider
58+
);
59+
60+
Assert.IsType<SuccessResult>(result);
61+
62+
result = await Fixture.Streams.DecideAsync(
63+
streamName,
64+
new RemoveProductItem(pairOfShoes, DateTime.UtcNow),
65+
Decider
66+
);
67+
68+
Assert.IsType<SuccessResult>(result);
69+
70+
result = await Fixture.Streams.DecideAsync(
71+
streamName,
72+
new Confirm(DateTime.UtcNow),
73+
Decider
74+
);
75+
76+
Assert.IsType<SuccessResult>(result);
77+
78+
await Assert.ThrowsAsync<InvalidOperationException>(
79+
() =>
80+
Fixture.Streams.DecideAsync(
81+
streamName,
82+
new Cancel(DateTime.UtcNow),
83+
Decider
84+
)
85+
);
86+
}
87+
}
88+
89+
public record PricedProductItem(
90+
Guid ProductId,
91+
int Quantity,
92+
decimal UnitPrice
93+
) {
94+
public decimal TotalPrice => Quantity * UnitPrice;
95+
}
96+
97+
public abstract record ShoppingCart {
98+
public abstract record Event {
99+
public record Opened(
100+
Guid ClientId,
101+
DateTimeOffset OpenedAt
102+
) : Event;
103+
104+
public record ProductItemAdded(
105+
PricedProductItem ProductItem,
106+
DateTimeOffset AddedAt
107+
) : Event;
108+
109+
public record ProductItemRemoved(
110+
PricedProductItem ProductItem,
111+
DateTimeOffset RemovedAt
112+
) : Event;
113+
114+
public record Confirmed(
115+
DateTimeOffset ConfirmedAt
116+
) : Event;
117+
118+
public record Canceled(
119+
DateTimeOffset CanceledAt
120+
) : Event;
121+
122+
// This won't allow external inheritance and mimic union type in C#
123+
Event() { }
124+
}
125+
126+
public record Initial : ShoppingCart;
127+
128+
public record Pending(ProductItems ProductItems) : ShoppingCart;
129+
130+
public record Closed : ShoppingCart;
131+
132+
public static ShoppingCart Evolve(ShoppingCart state, Event @event) =>
133+
(state, @event) switch {
134+
(Initial, Opened) =>
135+
new Pending(ProductItems.Empty),
136+
137+
(Pending(var productItems), ProductItemAdded(var productItem, _)) =>
138+
new Pending(productItems.Add(productItem)),
139+
140+
(Pending(var productItems), ProductItemRemoved(var productItem, _)) =>
141+
new Pending(productItems.Remove(productItem)),
142+
143+
(Pending, Confirmed) =>
144+
new Closed(),
145+
146+
(Pending, Canceled) =>
147+
new Closed(),
148+
149+
_ => state
150+
};
151+
152+
public abstract record Command {
153+
public record Open(
154+
Guid ClientId,
155+
DateTimeOffset Now
156+
) : Command;
157+
158+
public record AddProductItem(
159+
PricedProductItem ProductItem,
160+
DateTimeOffset Now
161+
) : Command;
162+
163+
public record RemoveProductItem(
164+
PricedProductItem ProductItem,
165+
DateTimeOffset Now
166+
) : Command;
167+
168+
public record Confirm(
169+
DateTimeOffset Now
170+
) : Command;
171+
172+
public record Cancel(
173+
DateTimeOffset Now
174+
) : Command;
175+
176+
Command() { }
177+
}
178+
179+
public static Event[] Decide(Command command, ShoppingCart state) =>
180+
(state, command) switch {
181+
(Pending, Open) => [],
182+
183+
(Initial, Open(var clientId, var now)) => [new Opened(clientId, now)],
184+
185+
(Pending, AddProductItem(var productItem, var now)) => [new ProductItemAdded(productItem, now)],
186+
187+
(Pending(var productItems), RemoveProductItem(var productItem, var now)) =>
188+
productItems.HasEnough(productItem)
189+
? [new ProductItemRemoved(productItem, now)]
190+
: throw new InvalidOperationException("Not enough product items to remove"),
191+
192+
(Pending, Confirm(var now)) => [new Confirmed(now)],
193+
194+
(Pending, Cancel(var now)) => [new Canceled(now)],
195+
196+
_ => throw new InvalidOperationException(
197+
$"Cannot {command.GetType().Name} for {state.GetType().Name} shopping cart"
198+
)
199+
};
200+
201+
public static readonly Decider<ShoppingCart, Command, Event> Decider = new Decider<ShoppingCart, Command, Event>(
202+
Decide,
203+
Evolve,
204+
() => new Initial()
205+
);
206+
}
207+
208+
public record ProductItems(ImmutableDictionary<string, int> Items) {
209+
public static ProductItems Empty => new(ImmutableDictionary<string, int>.Empty);
210+
211+
public ProductItems Add(PricedProductItem productItem) =>
212+
IncrementQuantity(Key(productItem), productItem.Quantity);
213+
214+
public ProductItems Remove(PricedProductItem productItem) =>
215+
IncrementQuantity(Key(productItem), -productItem.Quantity);
216+
217+
public bool HasEnough(PricedProductItem productItem) =>
218+
Items.TryGetValue(Key(productItem), out var currentQuantity) && currentQuantity >= productItem.Quantity;
219+
220+
static string Key(PricedProductItem pricedProductItem) =>
221+
$"{pricedProductItem.ProductId}_{pricedProductItem.UnitPrice}";
222+
223+
ProductItems IncrementQuantity(string key, int quantity) =>
224+
new(Items.SetItem(key, Items.TryGetValue(key, out var current) ? current + quantity : quantity));
225+
}

0 commit comments

Comments
 (0)