diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 41df2a9f0..d0c8b353e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,11 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.BulkOptions; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.UpdateQuery; -import org.springframework.util.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -27,6 +22,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.util.Assert; + /** * The reactive operations for the * Elasticsearch Document APIs. @@ -230,6 +231,7 @@ public interface ReactiveDocumentOperations { * @return a {@link Mono} emitting the {@literal id} of the removed document. */ Mono deleteById(String id, Class entityType, IndexCoordinates index); + /** * Delete the documents matching the given {@link Query} extracting index and type from entity metadata. * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index e36c1a473..8c5826362 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -15,6 +15,20 @@ */ package org.springframework.data.elasticsearch.core; +import static org.elasticsearch.index.VersionType.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -57,18 +71,17 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; -import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.CriteriaQuery; +import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.mapping.context.MappingContext; import org.springframework.http.HttpStatus; import org.springframework.lang.Nullable; import org.springframework.util.Assert; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; -import java.util.stream.Collectors; - -import static org.elasticsearch.index.VersionType.EXTERNAL; /** * @author Christoph Strobl @@ -165,8 +178,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera public Flux multiGet(Query query, Class clazz, IndexCoordinates index) { Assert.notNull(index, "Index must not be null"); - Assert.notNull(clazz, "Class must not be null"); - Assert.notNull(query, "Query must not be null"); + Assert.notNull(clazz, "Class must not be null"); + Assert.notNull(query, "Query must not be null"); Assert.notEmpty(query.getIds(), "No Id define for Query"); MultiGetRequest request = requestFactory.multiGetRequest(query, index); @@ -186,7 +199,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(queries, "List of UpdateQuery must not be null"); Assert.notNull(bulkOptions, "BulkOptions must not be null"); - Assert.notNull(index, "Index must not be null"); + Assert.notNull(index, "Index must not be null"); return doBulkOperation(queries, bulkOptions, index).then(); } @@ -215,9 +228,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera if (bulkResponse.hasFailures()) { Map failedDocuments = new HashMap<>(); for (BulkItemResponse item : bulkResponse.getItems()) { + if (item.isFailed()) { - failedDocuments.put(item.getId(), item.getFailureMessage()); - } + failedDocuments.put(item.getId(), item.getFailureMessage()); + } } ElasticsearchException exception = new ElasticsearchException( "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" @@ -270,8 +284,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera private IndexRequest getIndexRequest(Object value, AdaptibleEntity entity, IndexCoordinates index) { Object id = entity.getId(); - IndexRequest request = id != null - ? new IndexRequest(index.getIndexName()).id(converter.convertId(id)) + IndexRequest request = id != null ? new IndexRequest(index.getIndexName()).id(converter.convertId(id)) : new IndexRequest(index.getIndexName()); request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE); @@ -279,6 +292,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera if (entity.isVersionedEntity()) { Number version = entity.getVersion(); + if (version != null) { request.version(version.longValue()); request.versionType(EXTERNAL); @@ -294,8 +308,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera query.setId(id.toString()); } query.setObject(value); + if (entity.isVersionedEntity()) { Number version = entity.getVersion(); + if (version != null) { query.setVersion(version.longValue()); } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index 1b4d8f4dd..9de865e0a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -15,6 +15,9 @@ */ package org.springframework.data.elasticsearch.repository.support; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.elasticsearch.index.query.QueryBuilders; import org.reactivestreams.Publisher; import org.springframework.data.domain.Pageable; @@ -27,8 +30,6 @@ import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; import org.springframework.util.Assert; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author Christoph Strobl @@ -70,8 +71,8 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla Assert.notNull(entityStream, "EntityStream must not be null!"); - return elasticsearchOperations - .saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates()); + return elasticsearchOperations.saveAll(Flux.from(entityStream).collectList(), + entityInformation.getIndexCoordinates()); } @Override @@ -191,9 +192,7 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla throw new IllegalStateException("Entity id must not be null!"); } return convertId(id); - }) - .collectList() - .map(objects -> { + }).collectList().map(objects -> { return new StringQuery(QueryBuilders.idsQuery() // .addIds(objects.toArray(new String[0])) // @@ -201,8 +200,8 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla }) // .flatMap(query -> { - return elasticsearchOperations - .deleteBy(query, entityInformation.getJavaType(), entityInformation.getIndexCoordinates()); + return elasticsearchOperations.deleteBy(query, entityInformation.getJavaType(), + entityInformation.getIndexCoordinates()); }) // .then(); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 60a0a37dd..f560315c6 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -15,7 +15,30 @@ */ package org.springframework.data.elasticsearch.core; -import lombok.*; +import static org.assertj.core.api.Assertions.*; +import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.springframework.data.elasticsearch.annotations.FieldType.*; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.lang.Long; +import java.lang.Object; +import java.net.ConnectException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -35,24 +58,18 @@ import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.Score; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.elasticsearch.core.query.Criteria; +import org.springframework.data.elasticsearch.core.query.CriteriaQuery; +import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; +import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; +import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; +import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.data.elasticsearch.core.query.UpdateQueryBuilder; import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.util.StringUtils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import java.net.ConnectException; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.springframework.data.elasticsearch.annotations.FieldType.Text; /** * Integration tests for {@link ReactiveElasticsearchTemplate}.