Skip to content

Commit f690270

Browse files
committed
[DEVEX-222] Made all persistent subscription methods obsolete
1 parent f37727a commit f690270

27 files changed

+1035
-738
lines changed

samples/server-side-filtering/Program.cs

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma warning disable CS8321 // Local function is declared but never used
22

3-
using KurrentDB.Client;
43
using EventTypeFilter = KurrentDB.Client.EventTypeFilter;
54

65
const int eventCount = 100;
@@ -9,23 +8,26 @@
98

109
await using var client = new KurrentDBClient(KurrentDBClientSettings.Create("esdb://localhost:2113?tls=false"));
1110

12-
_ = Task.Run(async () => {
13-
await using var subscription = client.SubscribeToAll(
14-
FromAll.Start,
15-
filterOptions: new SubscriptionFilterOptions(EventTypeFilter.Prefix("some-")));
16-
await foreach (var message in subscription.Messages) {
17-
switch (message) {
18-
case StreamMessage.Event(var e):
19-
Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
20-
semaphore.Release();
21-
break;
22-
case StreamMessage.AllStreamCheckpointReached(var p):
23-
Console.WriteLine($"checkpoint taken at {p.PreparePosition}");
24-
break;
11+
_ = Task.Run(
12+
async () => {
13+
await using var subscription = client.SubscribeToAll(
14+
new SubscribeToAllOptions { FilterOptions = new SubscriptionFilterOptions(EventTypeFilter.Prefix("some-")) }
15+
);
16+
17+
await foreach (var message in subscription.Messages) {
18+
switch (message) {
19+
case StreamMessage.Event(var e):
20+
Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
21+
semaphore.Release();
22+
break;
23+
24+
case StreamMessage.AllStreamCheckpointReached(var p):
25+
Console.WriteLine($"checkpoint taken at {p.PreparePosition}");
26+
break;
27+
}
2528
}
2629
}
27-
});
28-
30+
);
2931

3032
await Task.Delay(2000);
3133

@@ -50,8 +52,11 @@ static async Task ExcludeSystemEvents(KurrentDBClient client) {
5052
#region exclude-system
5153

5254
await using var subscription = client.SubscribeToAll(
53-
FromAll.Start,
54-
filterOptions: new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents()));
55+
new SubscribeToAllOptions {
56+
FilterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents())
57+
}
58+
);
59+
5560
await foreach (var message in subscription.Messages) {
5661
switch (message) {
5762
case StreamMessage.Event(var e):
@@ -70,7 +75,7 @@ static async Task EventTypePrefix(KurrentDBClient client) {
7075

7176
#endregion event-type-prefix
7277

73-
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
78+
await using var subscription = client.SubscribeToAll(new SubscribeToAllOptions { FilterOptions = filterOptions });
7479
await foreach (var message in subscription.Messages) {
7580
switch (message) {
7681
case StreamMessage.Event(var e):
@@ -87,7 +92,7 @@ static async Task EventTypeRegex(KurrentDBClient client) {
8792

8893
#endregion event-type-regex
8994

90-
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
95+
await using var subscription = client.SubscribeToAll(new SubscribeToAllOptions { FilterOptions = filterOptions });
9196
await foreach (var message in subscription.Messages) {
9297
switch (message) {
9398
case StreamMessage.Event(var e):
@@ -104,7 +109,7 @@ static async Task StreamPrefix(KurrentDBClient client) {
104109

105110
#endregion stream-prefix
106111

107-
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
112+
await using var subscription = client.SubscribeToAll(new SubscribeToAllOptions { FilterOptions = filterOptions });
108113
await foreach (var message in subscription.Messages) {
109114
switch (message) {
110115
case StreamMessage.Event(var e):
@@ -121,7 +126,7 @@ static async Task StreamRegex(KurrentDBClient client) {
121126

122127
#endregion stream-regex
123128

124-
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
129+
await using var subscription = client.SubscribeToAll(new SubscribeToAllOptions { FilterOptions = filterOptions });
125130
await foreach (var message in subscription.Messages) {
126131
switch (message) {
127132
case StreamMessage.Event(var e):
@@ -136,18 +141,19 @@ static async Task CheckpointCallback(KurrentDBClient client) {
136141

137142
var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents());
138143

139-
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
144+
await using var subscription = client.SubscribeToAll(new SubscribeToAllOptions { FilterOptions = filterOptions });
140145
await foreach (var message in subscription.Messages) {
141146
switch (message) {
142147
case StreamMessage.Event(var e):
143148
Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
144149
break;
150+
145151
case StreamMessage.AllStreamCheckpointReached(var p):
146152
Console.WriteLine($"checkpoint taken at {p.PreparePosition}");
147153
break;
148154
}
149155
}
150-
156+
151157
#endregion checkpoint
152158
}
153159

@@ -158,12 +164,13 @@ static async Task CheckpointCallbackWithInterval(KurrentDBClient client) {
158164

159165
#endregion checkpoint-with-interval
160166

161-
await using var subscription = client.SubscribeToAll(FromAll.Start, filterOptions: filterOptions);
167+
await using var subscription = client.SubscribeToAll(new SubscribeToAllOptions { FilterOptions = filterOptions });
162168
await foreach (var message in subscription.Messages) {
163169
switch (message) {
164170
case StreamMessage.Event(var e):
165171
Console.WriteLine($"{e.Event.EventType} @ {e.Event.Position.PreparePosition}");
166172
break;
173+
167174
case StreamMessage.AllStreamCheckpointReached(var p):
168175
Console.WriteLine($"checkpoint taken at {p.PreparePosition}");
169176
break;

samples/subscribing-to-streams/Program.cs

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
using KurrentDB.Client;
2-
3-
#pragma warning disable CS8321 // Local function is declared but never used
1+
#pragma warning disable CS8321 // Local function is declared but never used
42

53
// ReSharper disable UnusedParameter.Local
64
// ReSharper disable UnusedVariable
75

86
await using var client = new KurrentDBClient(KurrentDBClientSettings.Create("esdb://localhost:2113?tls=false"));
97

10-
await Task.WhenAll(YieldSamples().Select(async sample => {
11-
try {
12-
await sample;
13-
} catch (OperationCanceledException) { }
14-
}));
15-
8+
await Task.WhenAll(
9+
YieldSamples().Select(
10+
async sample => {
11+
try {
12+
await sample;
13+
} catch (OperationCanceledException) { }
14+
}
15+
)
16+
);
1617

1718
return;
1819

@@ -34,8 +35,12 @@ static async Task SubscribeToStreamFromPosition(KurrentDBClient client, Cancella
3435

3536
await using var subscription = client.SubscribeToStream(
3637
"some-stream",
37-
FromStream.After(StreamPosition.FromInt64(20)),
38-
cancellationToken: ct);
38+
new SubscribeToStreamOptions {
39+
Start = FromStream.After(StreamPosition.FromInt64(20))
40+
},
41+
cancellationToken: ct
42+
);
43+
3944
await foreach (var message in subscription.Messages) {
4045
switch (message) {
4146
case StreamMessage.Event(var evnt):
@@ -53,8 +58,10 @@ static async Task SubscribeToStreamLive(KurrentDBClient client, CancellationToke
5358

5459
await using var subscription = client.SubscribeToStream(
5560
"some-stream",
56-
FromStream.End,
57-
cancellationToken: ct);
61+
new SubscribeToStreamOptions { Start = FromStream.End },
62+
cancellationToken: ct
63+
);
64+
5865
await foreach (var message in subscription.Messages) {
5966
switch (message) {
6067
case StreamMessage.Event(var evnt):
@@ -72,9 +79,10 @@ static async Task SubscribeToStreamResolvingLinkTos(KurrentDBClient client, Canc
7279

7380
await using var subscription = client.SubscribeToStream(
7481
"$et-myEventType",
75-
FromStream.Start,
76-
true,
77-
cancellationToken: ct);
82+
new SubscribeToStreamOptions { ResolveLinkTos = true },
83+
cancellationToken: ct
84+
);
85+
7886
await foreach (var message in subscription.Messages) {
7987
switch (message) {
8088
case StreamMessage.Event(var evnt):
@@ -91,16 +99,18 @@ static async Task SubscribeToStreamSubscriptionDropped(KurrentDBClient client, C
9199
#region subscribe-to-stream-subscription-dropped
92100

93101
var checkpoint = await ReadStreamCheckpointAsync() switch {
94-
null => FromStream.Start,
102+
null => FromStream.Start,
95103
var position => FromStream.After(position.Value)
96104
};
97105

98106
Subscribe:
99107
try {
100108
await using var subscription = client.SubscribeToStream(
101109
"some-stream",
102-
checkpoint,
103-
cancellationToken: ct);
110+
new SubscribeToStreamOptions { Start = checkpoint },
111+
cancellationToken: ct
112+
);
113+
104114
await foreach (var message in subscription.Messages) {
105115
switch (message) {
106116
case StreamMessage.Event(var evnt):
@@ -125,10 +135,8 @@ static async Task SubscribeToStreamSubscriptionDropped(KurrentDBClient client, C
125135
static async Task SubscribeToStream(KurrentDBClient client, CancellationToken ct) {
126136
#region subscribe-to-stream
127137

128-
await using var subscription = client.SubscribeToStream(
129-
"some-stream",
130-
FromStream.Start,
131-
cancellationToken: ct);
138+
await using var subscription = client.SubscribeToStream("some-stream", cancellationToken: ct);
139+
132140
await foreach (var message in subscription.Messages.WithCancellation(ct)) {
133141
switch (message) {
134142
case StreamMessage.Event(var evnt):
@@ -144,9 +152,8 @@ static async Task SubscribeToStream(KurrentDBClient client, CancellationToken ct
144152
static async Task SubscribeToAll(KurrentDBClient client, CancellationToken ct) {
145153
#region subscribe-to-all
146154

147-
await using var subscription = client.SubscribeToAll(
148-
FromAll.Start,
149-
cancellationToken: ct);
155+
await using var subscription = client.SubscribeToAll(cancellationToken: ct);
156+
150157
await foreach (var message in subscription.Messages) {
151158
switch (message) {
152159
case StreamMessage.Event(var evnt):
@@ -169,8 +176,12 @@ static async Task SubscribeToAllFromPosition(KurrentDBClient client, Cancellatio
169176
);
170177

171178
await using var subscription = client.SubscribeToAll(
172-
FromAll.After(result.LogPosition),
173-
cancellationToken: ct);
179+
new SubscribeToAllOptions {
180+
Start = FromAll.After(result.LogPosition)
181+
},
182+
cancellationToken: ct
183+
);
184+
174185
await foreach (var message in subscription.Messages) {
175186
switch (message) {
176187
case StreamMessage.Event(var evnt):
@@ -187,8 +198,12 @@ static async Task SubscribeToAllLive(KurrentDBClient client, CancellationToken c
187198
#region subscribe-to-all-live
188199

189200
var subscription = client.SubscribeToAll(
190-
FromAll.End,
191-
cancellationToken: ct);
201+
new SubscribeToAllOptions {
202+
Start = FromAll.End
203+
},
204+
cancellationToken: ct
205+
);
206+
192207
await foreach (var message in subscription.Messages) {
193208
switch (message) {
194209
case StreamMessage.Event(var evnt):
@@ -205,15 +220,17 @@ static async Task SubscribeToAllSubscriptionDropped(KurrentDBClient client, Canc
205220
#region subscribe-to-all-subscription-dropped
206221

207222
var checkpoint = await ReadCheckpointAsync() switch {
208-
null => FromAll.Start,
223+
null => FromAll.Start,
209224
var position => FromAll.After(position.Value)
210225
};
211226

212227
Subscribe:
213228
try {
214229
await using var subscription = client.SubscribeToAll(
215-
checkpoint,
216-
cancellationToken: ct);
230+
new SubscribeToAllOptions { Start = checkpoint },
231+
cancellationToken: ct
232+
);
233+
217234
await foreach (var message in subscription.Messages) {
218235
switch (message) {
219236
case StreamMessage.Event(var evnt):
@@ -243,16 +260,18 @@ static async Task SubscribeToFiltered(KurrentDBClient client, CancellationToken
243260

244261
var prefixStreamFilter = new SubscriptionFilterOptions(StreamFilter.Prefix("test-", "other-"));
245262
await using var subscription = client.SubscribeToAll(
246-
FromAll.Start,
247-
filterOptions: prefixStreamFilter,
248-
cancellationToken: ct);
263+
new SubscribeToAllOptions { FilterOptions = prefixStreamFilter },
264+
cancellationToken: ct
265+
);
266+
249267
await foreach (var message in subscription.Messages) {
250268
switch (message) {
251269
case StreamMessage.Event(var evnt):
252270
Console.WriteLine($"Received event {evnt.OriginalEventNumber}@{evnt.OriginalStreamId}");
253271
await HandleEvent(evnt);
254272

255273
break;
274+
256275
case StreamMessage.AllStreamCheckpointReached(var position):
257276
Console.WriteLine($"Checkpoint reached: {position}");
258277
break;
@@ -272,9 +291,10 @@ static async Task OverridingUserCredentials(KurrentDBClient client, Cancellation
272291
#region overriding-user-credentials
273292

274293
await using var subscription = client.SubscribeToAll(
275-
FromAll.Start,
276-
userCredentials: new UserCredentials("admin", "changeit"),
277-
cancellationToken: ct);
294+
new SubscribeToAllOptions { UserCredentials = new UserCredentials("admin", "changeit") },
295+
cancellationToken: ct
296+
);
297+
278298
await foreach (var message in subscription.Messages) {
279299
switch (message) {
280300
case StreamMessage.Event(var evnt):

0 commit comments

Comments
 (0)