Skip to content

DATAMONGO-2331 - Add support for Update with an aggregation pipeline. #789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2331-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2331-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2331-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAMONGO-2331-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.lang.Nullable;

import com.mongodb.client.result.UpdateResult;
Expand Down Expand Up @@ -150,14 +151,27 @@ interface TerminatingUpdate<T> extends TerminatingFindAndModify<T>, FindAndModif
*/
interface UpdateWithUpdate<T> {

/**
* Set the {@link UpdateDefinition} to be applied.
*
* @param update must not be {@literal null}.
* @return new instance of {@link TerminatingUpdate}.
* @throws IllegalArgumentException if update is {@literal null}.
*/
TerminatingUpdate<T> apply(UpdateDefinition update);

/**
* Set the {@link Update} to be applied.
*
* @param update must not be {@literal null}.
* @return new instance of {@link TerminatingUpdate}.
* @throws IllegalArgumentException if update is {@literal null}.
* @deprecated since 2.3 in favor of {@link #apply(UpdateDefinition)}.
*/
TerminatingUpdate<T> apply(Update update);
@Deprecated
default TerminatingUpdate<T> apply(Update update) {
return apply((UpdateDefinition) update);
}

/**
* Specify {@code replacement} object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.experimental.FieldDefaults;

import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -67,7 +67,7 @@ static class ExecutableUpdateSupport<T>
@NonNull MongoTemplate template;
@NonNull Class domainType;
Query query;
@Nullable Update update;
@Nullable UpdateDefinition update;
@Nullable String collection;
@Nullable FindAndModifyOptions findAndModifyOptions;
@Nullable FindAndReplaceOptions findAndReplaceOptions;
Expand All @@ -76,10 +76,10 @@ static class ExecutableUpdateSupport<T>

/*
* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ExecutableUpdateOperation.UpdateWithUpdate#apply(Update)
* @see org.springframework.data.mongodb.core.ExecutableUpdateOperation.UpdateWithUpdate#apply(org.springframework.data.mongodb.core.query.UpdateDefinition)
*/
@Override
public TerminatingUpdate<T> apply(Update update) {
public TerminatingUpdate<T> apply(UpdateDefinition update) {

Assert.notNull(update, "Update must not be null!");

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.Fields;
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.DbRefResolver;
Expand Down Expand Up @@ -108,7 +110,6 @@
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
import org.springframework.data.mongodb.core.validation.Validator;
Expand Down Expand Up @@ -1043,25 +1044,25 @@ public <T> GeoResults<T> geoNear(NearQuery near, Class<?> domainType, String col

@Nullable
@Override
public <T> T findAndModify(Query query, Update update, Class<T> entityClass) {
public <T> T findAndModify(Query query, UpdateDefinition update, Class<T> entityClass) {
return findAndModify(query, update, new FindAndModifyOptions(), entityClass, getCollectionName(entityClass));
}

@Nullable
@Override
public <T> T findAndModify(Query query, Update update, Class<T> entityClass, String collectionName) {
public <T> T findAndModify(Query query, UpdateDefinition update, Class<T> entityClass, String collectionName) {
return findAndModify(query, update, new FindAndModifyOptions(), entityClass, collectionName);
}

@Nullable
@Override
public <T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Class<T> entityClass) {
public <T> T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options, Class<T> entityClass) {
return findAndModify(query, update, options, entityClass, getCollectionName(entityClass));
}

@Nullable
@Override
public <T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Class<T> entityClass,
public <T> T findAndModify(Query query, UpdateDefinition update, FindAndModifyOptions options, Class<T> entityClass,
String collectionName) {

Assert.notNull(query, "Query must not be null!");
Expand Down Expand Up @@ -1561,53 +1562,54 @@ public Object doInCollection(MongoCollection<Document> collection) throws MongoE
}

@Override
public UpdateResult upsert(Query query, Update update, Class<?> entityClass) {
public UpdateResult upsert(Query query, UpdateDefinition update, Class<?> entityClass) {
return doUpdate(getCollectionName(entityClass), query, update, entityClass, true, false);
}

@Override
public UpdateResult upsert(Query query, Update update, String collectionName) {
public UpdateResult upsert(Query query, UpdateDefinition update, String collectionName) {
return doUpdate(collectionName, query, update, null, true, false);
}

@Override
public UpdateResult upsert(Query query, Update update, Class<?> entityClass, String collectionName) {
public UpdateResult upsert(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {

Assert.notNull(entityClass, "EntityClass must not be null!");

return doUpdate(collectionName, query, update, entityClass, true, false);
}

@Override
public UpdateResult updateFirst(Query query, Update update, Class<?> entityClass) {
public UpdateResult updateFirst(Query query, UpdateDefinition update, Class<?> entityClass) {
return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, false);
}

@Override
public UpdateResult updateFirst(final Query query, final Update update, final String collectionName) {
public UpdateResult updateFirst(final Query query, final UpdateDefinition update, final String collectionName) {
return doUpdate(collectionName, query, update, null, false, false);
}

@Override
public UpdateResult updateFirst(Query query, Update update, Class<?> entityClass, String collectionName) {
public UpdateResult updateFirst(Query query, UpdateDefinition update, Class<?> entityClass, String collectionName) {

Assert.notNull(entityClass, "EntityClass must not be null!");

return doUpdate(collectionName, query, update, entityClass, false, false);
}

@Override
public UpdateResult updateMulti(Query query, Update update, Class<?> entityClass) {
public UpdateResult updateMulti(Query query, UpdateDefinition update, Class<?> entityClass) {
return doUpdate(getCollectionName(entityClass), query, update, entityClass, false, true);
}

@Override
public UpdateResult updateMulti(final Query query, final Update update, String collectionName) {
public UpdateResult updateMulti(final Query query, final UpdateDefinition update, String collectionName) {
return doUpdate(collectionName, query, update, null, false, true);
}

@Override
public UpdateResult updateMulti(final Query query, final Update update, Class<?> entityClass, String collectionName) {
public UpdateResult updateMulti(final Query query, final UpdateDefinition update, Class<?> entityClass,
String collectionName) {

Assert.notNull(entityClass, "EntityClass must not be null!");

Expand All @@ -1622,24 +1624,52 @@ protected UpdateResult doUpdate(final String collectionName, final Query query,
Assert.notNull(query, "Query must not be null!");
Assert.notNull(update, "Update must not be null!");

return execute(collectionName, collection -> {
MongoPersistentEntity<?> entity = entityClass == null ? null : getPersistentEntity(entityClass);
increaseVersionForUpdateIfNecessary(entity, update);

MongoPersistentEntity<?> entity = entityClass == null ? null : getPersistentEntity(entityClass);
UpdateOptions opts = new UpdateOptions();
opts.upsert(upsert);

increaseVersionForUpdateIfNecessary(entity, update);
if (update.hasArrayFilters()) {
opts.arrayFilters(update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()));
}

UpdateOptions opts = new UpdateOptions();
opts.upsert(upsert);
Document queryObj = new Document();

if (update.hasArrayFilters()) {
opts.arrayFilters(update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()));
}
if (query != null) {
queryObj.putAll(queryMapper.getMappedObject(query.getQueryObject(), entity));
}

Document queryObj = new Document();
if (multi && update.isIsolated() && !queryObj.containsKey("$isolated")) {
queryObj.put("$isolated", 1);
}

if (query != null) {
queryObj.putAll(queryMapper.getMappedObject(query.getQueryObject(), entity));
}
if (update instanceof AggregationUpdate) {

AggregationOperationContext context = entityClass != null
? new RelaxedTypeBasedAggregationOperationContext(entityClass, mappingContext, queryMapper)
: Aggregation.DEFAULT_CONTEXT;

AggregationUpdate aUppdate = ((AggregationUpdate) update);
List<Document> pipeline = new AggregationUtil(queryMapper, mappingContext).createPipeline(aUppdate, context);

return execute(collectionName, collection -> {

MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.UPDATE, collectionName,
entityClass, update.getUpdateObject(), queryObj);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);

collection = writeConcernToUse != null ? collection.withWriteConcern(writeConcernToUse) : collection;

if (multi) {
return collection.updateMany(queryObj, pipeline, opts);
}

return collection.updateOne(queryObj, pipeline, opts);
});
}

return execute(collectionName, collection -> {

operations.forType(entityClass) //
.getCollation(query) //
Expand All @@ -1649,10 +1679,6 @@ protected UpdateResult doUpdate(final String collectionName, final Query query,
Document updateObj = update instanceof MappedUpdate ? update.getUpdateObject()
: updateMapper.getMappedObject(update.getUpdateObject(), entity);

if (multi && update.isIsolated() && !queryObj.containsKey("$isolated")) {
queryObj.put("$isolated", 1);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Calling update using query: {} and update: {} in collection: {}", serializeToJsonSafely(queryObj),
serializeToJsonSafely(updateObj), collectionName);
Expand Down Expand Up @@ -2640,7 +2666,7 @@ protected <T> T doFindAndRemove(String collectionName, Document query, Document

@SuppressWarnings("ConstantConditions")
protected <T> T doFindAndModify(String collectionName, Document query, Document fields, Document sort,
Class<T> entityClass, Update update, @Nullable FindAndModifyOptions options) {
Class<T> entityClass, UpdateDefinition update, @Nullable FindAndModifyOptions options) {

EntityReader<? super T, Bson> readerToUse = this.mongoConverter;

Expand All @@ -2653,7 +2679,18 @@ protected <T> T doFindAndModify(String collectionName, Document query, Document
increaseVersionForUpdateIfNecessary(entity, update);

Document mappedQuery = queryMapper.getMappedObject(query, entity);
Document mappedUpdate = updateMapper.getMappedObject(update.getUpdateObject(), entity);

Object mappedUpdate = new Document();
if (update instanceof AggregationUpdate) {

AggregationOperationContext context = entityClass != null
? new RelaxedTypeBasedAggregationOperationContext(entityClass, mappingContext, queryMapper)
: Aggregation.DEFAULT_CONTEXT;

mappedUpdate = new AggregationUtil(queryMapper, mappingContext).createPipeline((Aggregation) update, context);
} else {
mappedUpdate = updateMapper.getMappedObject(update.getUpdateObject(), entity);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
Expand Down Expand Up @@ -3027,11 +3064,11 @@ private static class FindAndModifyCallback implements CollectionCallback<Documen
private final Document query;
private final Document fields;
private final Document sort;
private final Document update;
private final Object update;
private final List<Document> arrayFilters;
private final FindAndModifyOptions options;

public FindAndModifyCallback(Document query, Document fields, Document sort, Document update,
public FindAndModifyCallback(Document query, Document fields, Document sort, Object update,
List<Document> arrayFilters, FindAndModifyOptions options) {
this.query = query;
this.fields = fields;
Expand Down Expand Up @@ -3059,7 +3096,12 @@ public Document doInCollection(MongoCollection<Document> collection) throws Mong
opts.arrayFilters(arrayFilters);
}

return collection.findOneAndUpdate(query, update, opts);
if (update instanceof Document) {
return collection.findOneAndUpdate(query, (Document) update, opts);
} else if (update instanceof List) {
return collection.findOneAndUpdate(query, (List<Document>) update, opts);
}
throw new IllegalArgumentException("doh - that does not work");
}
}

Expand Down
Loading