diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 37109773b..8bc4e006c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.query.UpdateResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -34,6 +35,7 @@ import org.springframework.util.Assert; * * @author Peter-Josef Meisch * @author Aleksei Arsenev + * @author Roman Puchkovskiy * @since 4.0 */ public interface ReactiveDocumentOperations { @@ -324,4 +326,14 @@ public interface ReactiveDocumentOperations { * @return a {@link Mono} emitting the number of the removed documents. */ Mono delete(Query query, Class entityType, IndexCoordinates index); + + /** + * Partial update of the document. + * + * @param updateQuery query defining the update + * @param index the index where to update the records + * @return a {@link Mono} emitting the update response + * @since 4.1 + */ + Mono update(UpdateQuery updateQuery, IndexCoordinates index); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index bccdbacc8..86d541f7c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -73,6 +74,7 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.support.VersionInfo; import org.springframework.data.mapping.PersistentPropertyAccessor; import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; @@ -524,6 +526,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).publishNext(); } + @Override + public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { + Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(index, "Index must not be null"); + + return Mono.defer(() -> { + UpdateRequest request = requestFactory.updateRequest(updateQuery, index); + return Mono.from(execute(client -> client.update(request))) + .map(response -> new UpdateResponse(UpdateResponse.Result.valueOf(response.getResult().name()))); + }); + } + @Override public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index b2dbbbda0..c66b5f511 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -63,16 +63,7 @@ import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.Score; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.Criteria; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; -import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; -import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; -import org.springframework.data.elasticsearch.core.query.StringQuery; -import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.util.StringUtils; @@ -965,6 +956,29 @@ public class ReactiveElasticsearchTemplateTests { template.save(forEdit).as(StepVerifier::create).expectNextCount(1).verifyComplete(); } + @Test // DATAES-909 + void shouldDoUpdate() { + SampleEntity entity = randomEntity("test message"); + entity.rate = 1; + index(entity); + + org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document + .create(); + document.put("message", "updated"); + UpdateQuery updateQuery = UpdateQuery.builder(entity.getId()) // + .withDocument(document) // + .build(); + + UpdateResponse updateResponse = template.update(updateQuery, IndexCoordinates.of(DEFAULT_INDEX)).block(); + assertThat(updateResponse).isNotNull(); + assertThat(updateResponse.getResult()).isEqualTo(UpdateResponse.Result.UPDATED); + + template.get(entity.getId(), SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) // + .as(StepVerifier::create) // + .expectNextMatches(foundEntity -> foundEntity.getMessage().equals("updated")) // + .verifyComplete(); + } + @Test // DATAES-908 void shouldFillVersionOnSaveOne() { VersionedEntity saved = template.save(new VersionedEntity()).block();