From d7262e43701b499d9880b51ebeb767963be60cab Mon Sep 17 00:00:00 2001 From: alesharik Date: Mon, 13 Jan 2020 22:26:19 +0300 Subject: [PATCH] DATAES-623 - Add bulk operations for ReactiveElasticsearchRepository. Original PR:#376 --- .../core/ReactiveDocumentOperations.java | 67 ++++++- .../core/ReactiveElasticsearchTemplate.java | 164 ++++++++++++++---- .../ElasticsearchEntityInformation.java | 4 + ...SimpleReactiveElasticsearchRepository.java | 49 +++++- .../ReactiveElasticsearchTemplateTests.java | 145 ++++++++++++---- 5 files changed, 358 insertions(+), 71 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 8a7e73be7..41df2a9f0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -16,16 +16,23 @@ package org.springframework.data.elasticsearch.core; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.util.Assert; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.springframework.util.Assert; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; /** * The reactive operations for the * Elasticsearch Document APIs. * * @author Peter-Josef Meisch + * @author Aleksei Arsenev * @since 4.0 */ public interface ReactiveDocumentOperations { @@ -78,6 +85,64 @@ public interface ReactiveDocumentOperations { */ Mono save(T entity, IndexCoordinates index); + /** + * Index entities under the given {@literal type} in the given {@literal index}. If the {@literal index} is + * {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}. + * + * @param entities must not be {@literal null}. + * @param index the target index, must not be {@literal null} + * @param + * @return a {@link Flux} emitting saved entities. + * @since 4.0 + */ + default Flux saveAll(Iterable entities, IndexCoordinates index) { + List entityList = new ArrayList<>(); + entities.forEach(entityList::add); + return saveAll(Mono.just(entityList), index); + } + + /** + * Index entities under the given {@literal type} in the given {@literal index}. If the {@literal index} is + * {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}. + * + * @param entities must not be {@literal null}. + * @param index the target index, must not be {@literal null} + * @param + * @return a {@link Flux} emitting saved entities. + * @since 4.0 + */ + Flux saveAll(Mono> entities, IndexCoordinates index); + + /** + * Execute a multiGet against elasticsearch for the given ids. + * + * @param query the query defining the ids of the objects to get + * @param clazz the type of the object to be returned + * @param index the index(es) from which the objects are read. + * @return flux with list of nullable objects + * @since 4.0 + */ + Flux multiGet(Query query, Class clazz, IndexCoordinates index); + + /** + * Bulk update all objects. Will do update. + * + * @param queries the queries to execute in bulk + * @since 4.0 + */ + default Mono bulkUpdate(List queries, IndexCoordinates index) { + return bulkUpdate(queries, BulkOptions.defaultOptions(), index); + } + + /** + * Bulk update all objects. Will do update. + * + * @param queries the queries to execute in bulk + * @param bulkOptions options to be added to the bulk request + * @since 4.0 + */ + Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index); + /** * Find the document with the given {@literal id} mapped onto the given {@literal entityType}. * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index ae2357c78..e36c1a473 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -15,17 +15,12 @@ */ package org.springframework.data.elasticsearch.core; -import static org.elasticsearch.index.VersionType.*; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; @@ -48,26 +43,32 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.ElasticsearchException; import org.springframework.data.elasticsearch.NoSuchIndexException; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity; import org.springframework.data.elasticsearch.core.EntityOperations.Entity; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; +import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocument; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.mapping.context.MappingContext; import org.springframework.http.HttpStatus; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; +import java.util.stream.Collectors; + +import static org.elasticsearch.index.VersionType.EXTERNAL; /** * @author Christoph Strobl @@ -76,6 +77,7 @@ import org.springframework.util.Assert; * @author Martin Choraine * @author Peter-Josef Meisch * @author Mathias Teier + * @author Aleksei Arsenev * @since 3.2 */ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations { @@ -135,6 +137,60 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return save(entity, getIndexCoordinatesFor(entity.getClass())); } + @Override + public Flux saveAll(Mono> entities, IndexCoordinates index) { + + Assert.notNull(entities, "Entities must not be null!"); + + return entities.flatMapMany(entityList -> { + + List> adaptibleEntities = entityList.stream() // + .map(e -> operations.forEntity(e, converter.getConversionService())) // + .collect(Collectors.toList()); + Iterator> iterator = adaptibleEntities.iterator(); + List indexRequests = adaptibleEntities.stream() // + .map(e -> getIndexQuery(e.getBean(), e)) // + .collect(Collectors.toList()); + return doBulkOperation(indexRequests, BulkOptions.defaultOptions(), index) // + .map(bulkItemResponse -> { + + AdaptibleEntity mappedEntity = iterator.next(); + mappedEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId()); + return mappedEntity.getBean(); + }); + }); + } + + @Override + public Flux multiGet(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(index, "Index must not be null"); + Assert.notNull(clazz, "Class must not be null"); + Assert.notNull(query, "Query must not be null"); + Assert.notEmpty(query.getIds(), "No Id define for Query"); + + MultiGetRequest request = requestFactory.multiGetRequest(query, index); + return Flux.from(execute(client -> client.multiGet(request))) // + .handle((result, sink) -> { + + Document document = DocumentAdapters.from(result); + T entity = converter.mapDocument(document, clazz); + if (entity != null) { + sink.next(entity); + } + }); + } + + @Override + public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + + Assert.notNull(queries, "List of UpdateQuery must not be null"); + Assert.notNull(bulkOptions, "BulkOptions must not be null"); + Assert.notNull(index, "Index must not be null"); + + return doBulkOperation(queries, bulkOptions, index).then(); + } + /** * Customization hook on the actual execution result {@link Publisher}.
* You know what you're doing here? Well fair enough, go ahead on your own risk. @@ -146,6 +202,33 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return Mono.from(execute(client -> client.index(request))); } + protected Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); + return client.bulk(bulkRequest) // + .onErrorMap(e -> new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) // + .flatMap(this::checkForBulkOperationFailure) // + .flatMapMany(response -> Flux.fromArray(response.getItems())); + } + + protected Mono checkForBulkOperationFailure(BulkResponse bulkResponse) { + + if (bulkResponse.hasFailures()) { + Map failedDocuments = new HashMap<>(); + for (BulkItemResponse item : bulkResponse.getItems()) { + if (item.isFailed()) { + failedDocuments.put(item.getId(), item.getFailureMessage()); + } + } + ElasticsearchException exception = new ElasticsearchException( + "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + + failedDocuments + ']', + failedDocuments); + return Mono.error(exception); + } else { + return Mono.just(bulkResponse); + } + } + /** * Customization hook on the actual execution result {@link Publisher}.
* @@ -178,29 +261,48 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera private Mono doIndex(Object value, AdaptibleEntity entity, IndexCoordinates index) { return Mono.defer(() -> { - - Object id = entity.getId(); - - IndexRequest request = id != null - ? new IndexRequest(index.getIndexName()).id(converter.convertId(id)) - : new IndexRequest(index.getIndexName()); - - request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE); - - if (entity.isVersionedEntity()) { - - Object version = entity.getVersion(); - if (version != null) { - request.version(((Number) version).longValue()); - request.versionType(EXTERNAL); - } - } - + IndexRequest request = getIndexRequest(value, entity, index); request = prepareIndexRequest(value, request); return doIndex(request); }); } + private IndexRequest getIndexRequest(Object value, AdaptibleEntity entity, IndexCoordinates index) { + Object id = entity.getId(); + + IndexRequest request = id != null + ? new IndexRequest(index.getIndexName()).id(converter.convertId(id)) + : new IndexRequest(index.getIndexName()); + + request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE); + + if (entity.isVersionedEntity()) { + + Number version = entity.getVersion(); + if (version != null) { + request.version(version.longValue()); + request.versionType(EXTERNAL); + } + } + return request; + } + + private IndexQuery getIndexQuery(Object value, AdaptibleEntity entity) { + Object id = entity.getId(); + IndexQuery query = new IndexQuery(); + if (id != null) { + query.setId(id.toString()); + } + query.setObject(value); + if (entity.isVersionedEntity()) { + Number version = entity.getVersion(); + if (version != null) { + query.setVersion(version.longValue()); + } + } + return query; + } + @Override public Mono findById(String id, Class entityType) { return findById(id, entityType, getIndexCoordinatesFor(entityType)); diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java index 3f529ba0e..e2f57275c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java @@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.repository.support; import org.elasticsearch.index.VersionType; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.repository.core.EntityInformation; +import org.springframework.lang.Nullable; /** * @param @@ -27,6 +28,7 @@ import org.springframework.data.repository.core.EntityInformation; * @author Christoph Strobl * @author Ivan Greene * @author Peter-Josef Meisch + * @author Aleksei Arsenev */ public interface ElasticsearchEntityInformation extends EntityInformation { @@ -34,9 +36,11 @@ public interface ElasticsearchEntityInformation extends EntityInformation IndexCoordinates getIndexCoordinates(); + @Nullable Long getVersion(T entity); VersionType getVersionType(); + @Nullable String getParentId(T entity); } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index bc88bc301..1b4d8f4dd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -15,21 +15,25 @@ */ package org.springframework.data.elasticsearch.repository.support; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - +import org.elasticsearch.index.query.QueryBuilders; import org.reactivestreams.Publisher; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; import org.springframework.data.elasticsearch.core.SearchHit; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; import org.springframework.util.Assert; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Christoph Strobl * @author Peter-Josef Meisch + * @author Aleksei Arsenev * @since 3.2 */ public class SimpleReactiveElasticsearchRepository implements ReactiveElasticsearchRepository { @@ -65,7 +69,9 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla public Flux saveAll(Publisher entityStream) { Assert.notNull(entityStream, "EntityStream must not be null!"); - return Flux.from(entityStream).flatMap(this::save); + + return elasticsearchOperations + .saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates()); } @Override @@ -117,14 +123,22 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla Assert.notNull(ids, "Ids must not be null!"); - return Flux.fromIterable(ids).flatMap(this::findById); + return findAllById(Flux.fromIterable(ids)); } @Override public Flux findAllById(Publisher idStream) { Assert.notNull(idStream, "IdStream must not be null!"); - return Flux.from(idStream).buffer().flatMap(this::findAllById); + return Flux.from(idStream) // + .map(ID::toString) // + .collectList() // + .map(ids -> new NativeSearchQueryBuilder().withIds(ids).build()) // + .flatMapMany(query -> { + + IndexCoordinates index = entityInformation.getIndexCoordinates(); + return elasticsearchOperations.multiGet(query, entityInformation.getJavaType(), index); + }); } @Override @@ -169,7 +183,28 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla public Mono deleteAll(Publisher entityStream) { Assert.notNull(entityStream, "EntityStream must not be null!"); - return Flux.from(entityStream).flatMap(this::delete).then(); + return Flux.from(entityStream) // + .map(entity -> { + + ID id = entityInformation.getId(entity); + if (id == null) { + throw new IllegalStateException("Entity id must not be null!"); + } + return convertId(id); + }) + .collectList() + .map(objects -> { + + return new StringQuery(QueryBuilders.idsQuery() // + .addIds(objects.toArray(new String[0])) // + .toString()); + }) // + .flatMap(query -> { + + return elasticsearchOperations + .deleteBy(query, entityInformation.getJavaType(), entityInformation.getIndexCoordinates()); + }) // + .then(); } @Override diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 1d855e3c6..60a0a37dd 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -15,32 +15,9 @@ */ package org.springframework.data.elasticsearch.core; -import static org.assertj.core.api.Assertions.*; -import static org.elasticsearch.index.query.QueryBuilders.*; -import static org.springframework.data.elasticsearch.annotations.FieldType.*; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import java.lang.Long; -import java.lang.Object; -import java.net.ConnectException; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - +import lombok.*; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.junit.jupiter.api.AfterEach; @@ -56,17 +33,26 @@ import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.Score; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.Criteria; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; -import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; -import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.data.elasticsearch.core.query.StringQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.net.ConnectException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.springframework.data.elasticsearch.annotations.FieldType.Text; /** * Integration tests for {@link ReactiveElasticsearchTemplate}. @@ -76,6 +62,7 @@ import org.springframework.util.StringUtils; * @author Peter-Josef Meisch * @author Farid Azaza * @author Martin Choraine + * @author Aleksei Arsenev */ @SpringIntegrationTest public class ReactiveElasticsearchTemplateTests { @@ -711,6 +698,100 @@ public class ReactiveElasticsearchTemplateTests { .verifyComplete(); } + @Test // DATAES-623 + public void shouldReturnObjectsForGivenIdsUsingMultiGet() { + SampleEntity entity1 = randomEntity("test message 1"); + entity1.rate = 1; + index(entity1); + SampleEntity entity2 = randomEntity("test message 2"); + entity2.rate = 2; + index(entity2); + + NativeSearchQuery query = new NativeSearchQueryBuilder() // + .withIds(Arrays.asList(entity1.getId(), entity2.getId())) // + .build(); + + template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) // + .as(StepVerifier::create) // + .expectNext(entity1, entity2) // + .verifyComplete(); + } + + @Test // DATAES-623 + public void shouldReturnObjectsForGivenIdsUsingMultiGetWithFields() { + SampleEntity entity1 = randomEntity("test message 1"); + entity1.rate = 1; + index(entity1); + SampleEntity entity2 = randomEntity("test message 2"); + entity2.rate = 2; + index(entity2); + + NativeSearchQuery query = new NativeSearchQueryBuilder() // + .withIds(Arrays.asList(entity1.getId(), entity2.getId())) // + .withFields("message") // + .build(); + + template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) // + .as(StepVerifier::create) // + .expectNextCount(2) // + .verifyComplete(); + } + + @Test // DATAES-623 + public void shouldDoBulkUpdate() { + SampleEntity entity1 = randomEntity("test message 1"); + entity1.rate = 1; + index(entity1); + SampleEntity entity2 = randomEntity("test message 2"); + entity2.rate = 2; + index(entity2); + + IndexRequest indexRequest1 = new IndexRequest(); + indexRequest1.source("message", "updated 1"); + UpdateQuery updateQuery1 = new UpdateQueryBuilder() // + .withId(entity1.getId()) // + .withIndexRequest(indexRequest1).build(); + + IndexRequest indexRequest2 = new IndexRequest(); + indexRequest2.source("message", "updated 2"); + UpdateQuery updateQuery2 = new UpdateQueryBuilder() // + .withId(entity2.getId()) // + .withIndexRequest(indexRequest2).build(); + + List queries = Arrays.asList(updateQuery1, updateQuery2); + template.bulkUpdate(queries, IndexCoordinates.of(DEFAULT_INDEX)).block(); + + NativeSearchQuery getQuery = new NativeSearchQueryBuilder() // + .withIds(Arrays.asList(entity1.getId(), entity2.getId())) // + .build(); + template.multiGet(getQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) // + .as(StepVerifier::create) // + .expectNextMatches(entity -> entity.getMessage().equals("updated 1")) // + .expectNextMatches(entity -> entity.getMessage().equals("updated 2")) // + .verifyComplete(); + } + + @Test // DATAES-623 + void shouldSaveAll() { + SampleEntity entity1 = randomEntity("test message 1"); + entity1.rate = 1; + SampleEntity entity2 = randomEntity("test message 2"); + entity2.rate = 2; + + template.saveAll(Mono.just(Arrays.asList(entity1, entity2)), IndexCoordinates.of(DEFAULT_INDEX)) // + .as(StepVerifier::create) // + .expectNext(entity1) // + .expectNext(entity2) // + .verifyComplete(); + + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + template.search(searchQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) // + .as(StepVerifier::create) // + .expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) // + .expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) // + .verifyComplete(); + } + @Data @Document(indexName = "marvel") static class Person {