DATAES-909 - Add singular update() methods to ReactiveDocumentOperations.

Original PR: #507 
Co-authored-by: Peter-Josef Meisch <pj.meisch@sothawo.com>
This commit is contained in:
Roman Puchkovskiy 2020-08-20 08:13:59 +04:00 committed by GitHub
parent 26ab5f6db4
commit 0208bffc0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 10 deletions

View File

@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -34,6 +35,7 @@ import org.springframework.util.Assert;
*
* @author Peter-Josef Meisch
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @since 4.0
*/
public interface ReactiveDocumentOperations {
@ -324,4 +326,14 @@ public interface ReactiveDocumentOperations {
* @return a {@link Mono} emitting the number of the removed documents.
*/
Mono<Long> delete(Query query, Class<?> entityType, IndexCoordinates index);
/**
* Partial update of the document.
*
* @param updateQuery query defining the update
* @param index the index where to update the records
* @return a {@link Mono} emitting the update response
* @since 4.1
*/
Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index);
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@ -73,6 +74,7 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
@ -524,6 +526,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).publishNext();
}
@Override
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {
Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");
return Mono.defer(() -> {
UpdateRequest request = requestFactory.updateRequest(updateQuery, index);
return Mono.from(execute(client -> client.update(request)))
.map(response -> new UpdateResponse(UpdateResponse.Result.valueOf(response.getResult().name())));
});
}
@Override
public Mono<Long> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));

View File

@ -63,16 +63,7 @@ import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.Score;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.util.StringUtils;
@ -965,6 +956,29 @@ public class ReactiveElasticsearchTemplateTests {
template.save(forEdit).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
@Test // DATAES-909
void shouldDoUpdate() {
SampleEntity entity = randomEntity("test message");
entity.rate = 1;
index(entity);
org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document
.create();
document.put("message", "updated");
UpdateQuery updateQuery = UpdateQuery.builder(entity.getId()) //
.withDocument(document) //
.build();
UpdateResponse updateResponse = template.update(updateQuery, IndexCoordinates.of(DEFAULT_INDEX)).block();
assertThat(updateResponse).isNotNull();
assertThat(updateResponse.getResult()).isEqualTo(UpdateResponse.Result.UPDATED);
template.get(entity.getId(), SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
.as(StepVerifier::create) //
.expectNextMatches(foundEntity -> foundEntity.getMessage().equals("updated")) //
.verifyComplete();
}
@Test // DATAES-908
void shouldFillVersionOnSaveOne() {
VersionedEntity saved = template.save(new VersionedEntity()).block();