Skip to content

Commit d46af55

Browse files
christophstroblmp911de
authored andcommitted
Pass on FullDocumentBeforeChange option to change stream iterable/publisher.
This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher. It also corrects false optional behavior within the change stream task which did some defaulting though the actual value is an optional one that must not be present. Original pull request: #4541 Closes #4495
1 parent 1798205 commit d46af55

File tree

5 files changed

+50
-8
lines changed

5 files changed

+50
-8
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveChangeStreamOperationSupport.java

+1
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() {
168168
}
169169
});
170170
options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup);
171+
options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup);
171172
options.getCollation().ifPresent(builder::collation);
172173

173174
if (options.isResumeAfter()) {

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+4
Original file line numberDiff line numberDiff line change
@@ -2012,6 +2012,10 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
20122012
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
20132013
.orElse(publisher);
20142014
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
2015+
2016+
if(options.getFullDocumentBeforeChangeLookup().isPresent()) {
2017+
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());
2018+
}
20152019
return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
20162020
}) //
20172021
.flatMapMany(publisher -> Flux.from(publisher)

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
3838
import org.springframework.data.mongodb.core.convert.MongoConverter;
3939
import org.springframework.data.mongodb.core.convert.QueryMapper;
40-
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
4140
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
4241
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
4342
import org.springframework.lang.Nullable;
@@ -88,7 +87,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
8887
Collation collation = null;
8988
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
9089
: FullDocument.UPDATE_LOOKUP;
91-
FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
90+
FullDocumentBeforeChange fullDocumentBeforeChange = null;
9291
BsonTimestamp startAt = null;
9392
boolean resumeAfter = true;
9493

@@ -116,8 +115,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
116115
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
117116
: FullDocument.UPDATE_LOOKUP);
118117

119-
fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup()
120-
.orElse(FullDocumentBeforeChange.DEFAULT);
118+
if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) {
119+
fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get();
120+
}
121121

122122
startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
123123
}
@@ -158,7 +158,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
158158
}
159159

160160
iterable = iterable.fullDocument(fullDocument);
161-
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
161+
if(fullDocumentBeforeChange != null) {
162+
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
163+
}
162164

163165
return iterable.iterator();
164166
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

+21
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import com.mongodb.MongoClientSettings;
102102
import com.mongodb.ReadConcern;
103103
import com.mongodb.ReadPreference;
104+
import com.mongodb.WriteConcern;
104105
import com.mongodb.client.model.CountOptions;
105106
import com.mongodb.client.model.CreateCollectionOptions;
106107
import com.mongodb.client.model.DeleteOptions;
@@ -110,6 +111,7 @@
110111
import com.mongodb.client.model.ReplaceOptions;
111112
import com.mongodb.client.model.TimeSeriesGranularity;
112113
import com.mongodb.client.model.UpdateOptions;
114+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
113115
import com.mongodb.client.result.DeleteResult;
114116
import com.mongodb.client.result.InsertManyResult;
115117
import com.mongodb.client.result.InsertOneResult;
@@ -1604,6 +1606,25 @@ void changeStreamOptionStartAftershouldApplied() {
16041606
verify(changeStreamPublisher).startAfter(eq(token));
16051607
}
16061608

1609+
@Test // GH-4495
1610+
void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {
1611+
1612+
when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
1613+
1614+
when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
1615+
when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher);
1616+
when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher);
1617+
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
1618+
when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher);
1619+
1620+
template
1621+
.changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class)
1622+
.subscribe();
1623+
1624+
verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED));
1625+
1626+
}
1627+
16071628
private void stubFindSubscribe(Document document) {
16081629

16091630
Publisher<Document> realPublisher = Flux.just(document);

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.junit.jupiter.api.extension.ExtendWith;
2828
import org.mockito.Mock;
2929
import org.mockito.junit.jupiter.MockitoExtension;
30-
3130
import org.springframework.data.mongodb.core.MongoTemplate;
3231
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
3332
import org.springframework.data.mongodb.core.convert.MongoConverter;
@@ -39,6 +38,7 @@
3938
import com.mongodb.client.MongoCursor;
4039
import com.mongodb.client.MongoDatabase;
4140
import com.mongodb.client.model.changestream.ChangeStreamDocument;
41+
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
4242

4343
/**
4444
* @author Christoph Strobl
@@ -68,8 +68,6 @@ void setUp() {
6868
when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable);
6969

7070
when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable);
71-
72-
when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable);
7371
}
7472

7573
@Test // DATAMONGO-2258
@@ -125,6 +123,22 @@ void shouldApplyStartAfterToChangeStream() {
125123
verify(changeStreamIterable).startAfter(eq(resumeToken));
126124
}
127125

126+
@Test // GH-4495
127+
void shouldApplyFullDocumentBeforeChangeToChangeStream() {
128+
129+
when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable);
130+
131+
ChangeStreamRequest request = ChangeStreamRequest.builder() //
132+
.collection("start-wars") //
133+
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) //
134+
.publishTo(message -> {}) //
135+
.build();
136+
137+
initTask(request, Document.class);
138+
139+
verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED));
140+
}
141+
128142
private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {
129143

130144
ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});

0 commit comments

Comments
 (0)