Skip to content

Commit 7f4d3f2

Browse files
christophstroblmp911de
authored andcommitted
DATAMONGO-2258 - Add startAfter option to change stream support.
Original pull request: #739.
1 parent a30ee07 commit 7f4d3f2

File tree

4 files changed

+234
-2
lines changed

4 files changed

+234
-2
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.Optional;
2323

24+
import org.bson.BsonDocument;
2425
import org.bson.BsonTimestamp;
2526
import org.bson.BsonValue;
2627
import org.bson.Document;
@@ -51,6 +52,7 @@ public class ChangeStreamOptions {
5152
private @Nullable FullDocument fullDocumentLookup;
5253
private @Nullable Collation collation;
5354
private @Nullable Object resumeTimestamp;
55+
private Resume resume = Resume.RESUME_AFTER;
5456

5557
protected ChangeStreamOptions() {}
5658

@@ -97,6 +99,22 @@ public Optional<BsonTimestamp> getResumeBsonTimestamp() {
9799
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
98100
}
99101

102+
/**
103+
* @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
104+
* @since 2.2
105+
*/
106+
public boolean isStartAfter() {
107+
return Resume.START_AFTER.equals(resume);
108+
}
109+
110+
/**
111+
* @return {@literal true} if the change stream should be resumed after the {@link #getResumeToken() token}.
112+
* @since 2.2
113+
*/
114+
public boolean isResumeAfter() {
115+
return Resume.RESUME_AFTER.equals(resume);
116+
}
117+
100118
/**
101119
* @return empty {@link ChangeStreamOptions}.
102120
*/
@@ -137,6 +155,23 @@ private static <T> Object doGetTimestamp(Object timestamp, Class<T> targetType)
137155
+ ObjectUtils.nullSafeClassName(timestamp));
138156
}
139157

158+
/**
159+
* @author Christoph Strobl
160+
* @since 2.2
161+
*/
162+
enum Resume {
163+
164+
/**
165+
* @see com.mongodb.client.ChangeStreamIterable#startAfter(BsonDocument)
166+
*/
167+
START_AFTER,
168+
169+
/**
170+
* @see com.mongodb.client.ChangeStreamIterable#resumeAfter(BsonDocument)
171+
*/
172+
RESUME_AFTER
173+
}
174+
140175
/**
141176
* Builder for creating {@link ChangeStreamOptions}.
142177
*
@@ -150,6 +185,7 @@ public static class ChangeStreamOptionsBuilder {
150185
private @Nullable FullDocument fullDocumentLookup;
151186
private @Nullable Collation collation;
152187
private @Nullable Object resumeTimestamp;
188+
private Resume resume = Resume.RESUME_AFTER;
153189

154190
private ChangeStreamOptionsBuilder() {}
155191

@@ -273,6 +309,36 @@ public ChangeStreamOptionsBuilder resumeAt(BsonTimestamp resumeTimestamp) {
273309
return this;
274310
}
275311

312+
/**
313+
* Set the resume token after which to continue emitting notifications.
314+
*
315+
* @param resumeToken must not be {@literal null}.
316+
* @return this.
317+
* @since 2.2
318+
*/
319+
public ChangeStreamOptionsBuilder resumeAfter(BsonValue resumeToken) {
320+
321+
resumeToken(resumeToken);
322+
resume = Resume.RESUME_AFTER;
323+
324+
return this;
325+
}
326+
327+
/**
328+
* Set the resume token after which to start emitting notifications.
329+
*
330+
* @param resumeToken must not be {@literal null}.
331+
* @return this.
332+
* @since 2.2
333+
*/
334+
public ChangeStreamOptionsBuilder startAfter(BsonValue resumeToken) {
335+
336+
resumeToken(resumeToken);
337+
resume = Resume.START_AFTER;
338+
339+
return this;
340+
}
341+
276342
/**
277343
* @return the built {@link ChangeStreamOptions}
278344
*/
@@ -285,6 +351,7 @@ public ChangeStreamOptions build() {
285351
options.fullDocumentLookup = fullDocumentLookup;
286352
options.collation = collation;
287353
options.resumeTimestamp = resumeTimestamp;
354+
options.resume = resume;
288355

289356
return options;
290357
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,36 @@ public ChangeStreamRequestBuilder<T> resumeAt(Instant clusterTime) {
371371
return this;
372372
}
373373

374+
/**
375+
* Set the resume token after which to continue emitting notifications.
376+
*
377+
* @param resumeToken must not be {@literal null}.
378+
* @return this.
379+
* @since 2.2
380+
*/
381+
public ChangeStreamRequestBuilder<T> resumeAfter(BsonValue resumeToken) {
382+
383+
Assert.notNull(resumeToken, "ResumeToken must not be null!");
384+
this.delegate.resumeAfter(resumeToken);
385+
386+
return this;
387+
}
388+
389+
/**
390+
* Set the resume token after which to start emitting notifications.
391+
*
392+
* @param resumeToken must not be {@literal null}.
393+
* @return this.
394+
* @since 2.2
395+
*/
396+
public ChangeStreamRequestBuilder<T> startAfter(BsonValue resumeToken) {
397+
398+
Assert.notNull(resumeToken, "ResumeToken must not be null!");
399+
this.delegate.startAfter(resumeToken);
400+
401+
return this;
402+
}
403+
374404
/**
375405
* Set the {@link FullDocument} lookup to {@link FullDocument#UPDATE_LOOKUP}.
376406
*

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
9292
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
9393
: FullDocument.UPDATE_LOOKUP;
9494
BsonTimestamp startAt = null;
95+
boolean resumeAfter = true;
9596

9697
if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {
9798

@@ -108,7 +109,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
108109
}
109110

110111
if (changeStreamOptions.getResumeToken().isPresent()) {
112+
111113
resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
114+
resumeAfter = changeStreamOptions.isResumeAfter();
112115
}
113116

114117
fullDocument = changeStreamOptions.getFullDocumentLookup()
@@ -119,7 +122,8 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
119122
}
120123

121124
MongoDatabase db = StringUtils.hasText(options.getDatabaseName())
122-
? template.getMongoDbFactory().getDb(options.getDatabaseName()) : template.getDb();
125+
? template.getMongoDbFactory().getDb(options.getDatabaseName())
126+
: template.getDb();
123127

124128
ChangeStreamIterable<Document> iterable;
125129

@@ -132,7 +136,12 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
132136
}
133137

134138
if (!resumeToken.isEmpty()) {
135-
iterable = iterable.resumeAfter(resumeToken);
139+
140+
if (resumeAfter) {
141+
iterable = iterable.resumeAfter(resumeToken);
142+
} else {
143+
iterable = iterable.startAfter(resumeToken);
144+
}
136145
}
137146

138147
if (startAt != null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core.messaging;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import java.util.UUID;
21+
22+
import org.bson.BsonDocument;
23+
import org.bson.BsonString;
24+
import org.bson.Document;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.mockito.Mock;
29+
import org.mockito.junit.MockitoJUnitRunner;
30+
import org.springframework.data.mongodb.core.MongoTemplate;
31+
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
32+
import org.springframework.data.mongodb.core.convert.MongoConverter;
33+
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
34+
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
35+
36+
import com.mongodb.client.ChangeStreamIterable;
37+
import com.mongodb.client.MongoCollection;
38+
import com.mongodb.client.MongoCursor;
39+
import com.mongodb.client.MongoDatabase;
40+
import com.mongodb.client.model.changestream.ChangeStreamDocument;
41+
42+
/**
43+
* @author Christoph Strobl
44+
*/
45+
@RunWith(MockitoJUnitRunner.class)
46+
public class ChangeStreamTaskUnitTests {
47+
48+
ChangeStreamTask task;
49+
@Mock MongoTemplate template;
50+
@Mock MongoDatabase mongoDatabase;
51+
@Mock MongoCollection<Document> mongoCollection;
52+
@Mock ChangeStreamIterable<Document> changeStreamIterable;
53+
MongoConverter converter;
54+
55+
@Before
56+
public void setUp() {
57+
58+
MongoMappingContext mappingContext = new MongoMappingContext();
59+
converter = new MappingMongoConverter(NoOpDbRefResolver.INSTANCE, mappingContext);
60+
61+
when(template.getConverter()).thenReturn(converter);
62+
when(template.getDb()).thenReturn(mongoDatabase);
63+
64+
when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection);
65+
66+
when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable);
67+
68+
when(changeStreamIterable.startAfter(any())).thenReturn(changeStreamIterable);
69+
when(changeStreamIterable.resumeAfter(any())).thenReturn(changeStreamIterable);
70+
when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable);
71+
}
72+
73+
@Test // DATAMONGO-2258
74+
public void shouldBe2DotOneComplient() {
75+
76+
BsonDocument resumeToken = new BsonDocument("token", new BsonString(UUID.randomUUID().toString()));
77+
78+
ChangeStreamRequest request = ChangeStreamRequest.builder() //
79+
.collection("start-wars") //
80+
.resumeToken(resumeToken) //
81+
.publishTo(message -> {}) //
82+
.build();
83+
84+
initTask(request, Document.class);
85+
86+
verify(changeStreamIterable).resumeAfter(eq(resumeToken));
87+
}
88+
89+
@Test // DATAMONGO-2258
90+
public void shouldApplyResumeAfterToChangeStream() {
91+
92+
BsonDocument resumeToken = new BsonDocument("token", new BsonString(UUID.randomUUID().toString()));
93+
94+
ChangeStreamRequest request = ChangeStreamRequest.builder() //
95+
.collection("start-wars") //
96+
.resumeAfter(resumeToken) //
97+
.publishTo(message -> {}) //
98+
.build();
99+
100+
initTask(request, Document.class);
101+
102+
verify(changeStreamIterable).resumeAfter(eq(resumeToken));
103+
}
104+
105+
@Test // DATAMONGO-2258
106+
public void shouldApplyStartAfterToChangeStream() {
107+
108+
BsonDocument resumeToken = new BsonDocument("token", new BsonString(UUID.randomUUID().toString()));
109+
110+
ChangeStreamRequest request = ChangeStreamRequest.builder() //
111+
.collection("start-wars") //
112+
.startAfter(resumeToken) //
113+
.publishTo(message -> {}) //
114+
.build();
115+
116+
initTask(request, Document.class);
117+
118+
verify(changeStreamIterable).startAfter(eq(resumeToken));
119+
}
120+
121+
private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {
122+
123+
ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});
124+
return task.initCursor(template, request.getRequestOptions(), targetType);
125+
}
126+
}

0 commit comments

Comments
 (0)