diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java
index 8a7e73be7..41df2a9f0 100644
--- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java
+++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java
@@ -16,16 +16,23 @@
package org.springframework.data.elasticsearch.core;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
+import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.Query;
+import org.springframework.data.elasticsearch.core.query.UpdateQuery;
+import org.springframework.util.Assert;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import org.springframework.util.Assert;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
/**
* The reactive operations for the
* Elasticsearch Document APIs.
*
* @author Peter-Josef Meisch
+ * @author Aleksei Arsenev
* @since 4.0
*/
public interface ReactiveDocumentOperations {
@@ -78,6 +85,64 @@ public interface ReactiveDocumentOperations {
*/
Mono save(T entity, IndexCoordinates index);
+ /**
+ * Index entities under the given {@literal type} in the given {@literal index}. If the {@literal index} is
+ * {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}.
+ *
+ * @param entities must not be {@literal null}.
+ * @param index the target index, must not be {@literal null}
+ * @param
+ * @return a {@link Flux} emitting saved entities.
+ * @since 4.0
+ */
+ default Flux saveAll(Iterable entities, IndexCoordinates index) {
+ List entityList = new ArrayList<>();
+ entities.forEach(entityList::add);
+ return saveAll(Mono.just(entityList), index);
+ }
+
+ /**
+ * Index entities under the given {@literal type} in the given {@literal index}. If the {@literal index} is
+ * {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}.
+ *
+ * @param entities must not be {@literal null}.
+ * @param index the target index, must not be {@literal null}
+ * @param
+ * @return a {@link Flux} emitting saved entities.
+ * @since 4.0
+ */
+ Flux saveAll(Mono extends Collection extends T>> entities, IndexCoordinates index);
+
+ /**
+ * Execute a multiGet against elasticsearch for the given ids.
+ *
+ * @param query the query defining the ids of the objects to get
+ * @param clazz the type of the object to be returned
+ * @param index the index(es) from which the objects are read.
+ * @return flux with list of nullable objects
+ * @since 4.0
+ */
+ Flux multiGet(Query query, Class clazz, IndexCoordinates index);
+
+ /**
+ * Bulk update all objects. Will do update.
+ *
+ * @param queries the queries to execute in bulk
+ * @since 4.0
+ */
+ default Mono bulkUpdate(List queries, IndexCoordinates index) {
+ return bulkUpdate(queries, BulkOptions.defaultOptions(), index);
+ }
+
+ /**
+ * Bulk update all objects. Will do update.
+ *
+ * @param queries the queries to execute in bulk
+ * @param bulkOptions options to be added to the bulk request
+ * @since 4.0
+ */
+ Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index);
+
/**
* Find the document with the given {@literal id} mapped onto the given {@literal entityType}.
*
diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java
index ae2357c78..e36c1a473 100644
--- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java
+++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java
@@ -15,17 +15,12 @@
*/
package org.springframework.data.elasticsearch.core;
-import static org.elasticsearch.index.VersionType.*;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
@@ -48,26 +43,32 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Sort;
+import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.EntityOperations.Entity;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
+import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
-import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
-import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
-import org.springframework.data.elasticsearch.core.query.Query;
-import org.springframework.data.elasticsearch.core.query.StringQuery;
+import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.index.VersionType.EXTERNAL;
/**
* @author Christoph Strobl
@@ -76,6 +77,7 @@ import org.springframework.util.Assert;
* @author Martin Choraine
* @author Peter-Josef Meisch
* @author Mathias Teier
+ * @author Aleksei Arsenev
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations {
@@ -135,6 +137,60 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return save(entity, getIndexCoordinatesFor(entity.getClass()));
}
+ @Override
+ public Flux saveAll(Mono extends Collection extends T>> entities, IndexCoordinates index) {
+
+ Assert.notNull(entities, "Entities must not be null!");
+
+ return entities.flatMapMany(entityList -> {
+
+ List> adaptibleEntities = entityList.stream() //
+ .map(e -> operations.forEntity(e, converter.getConversionService())) //
+ .collect(Collectors.toList());
+ Iterator> iterator = adaptibleEntities.iterator();
+ List indexRequests = adaptibleEntities.stream() //
+ .map(e -> getIndexQuery(e.getBean(), e)) //
+ .collect(Collectors.toList());
+ return doBulkOperation(indexRequests, BulkOptions.defaultOptions(), index) //
+ .map(bulkItemResponse -> {
+
+ AdaptibleEntity extends T> mappedEntity = iterator.next();
+ mappedEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
+ return mappedEntity.getBean();
+ });
+ });
+ }
+
+ @Override
+ public Flux multiGet(Query query, Class clazz, IndexCoordinates index) {
+
+ Assert.notNull(index, "Index must not be null");
+ Assert.notNull(clazz, "Class must not be null");
+ Assert.notNull(query, "Query must not be null");
+ Assert.notEmpty(query.getIds(), "No Id define for Query");
+
+ MultiGetRequest request = requestFactory.multiGetRequest(query, index);
+ return Flux.from(execute(client -> client.multiGet(request))) //
+ .handle((result, sink) -> {
+
+ Document document = DocumentAdapters.from(result);
+ T entity = converter.mapDocument(document, clazz);
+ if (entity != null) {
+ sink.next(entity);
+ }
+ });
+ }
+
+ @Override
+ public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) {
+
+ Assert.notNull(queries, "List of UpdateQuery must not be null");
+ Assert.notNull(bulkOptions, "BulkOptions must not be null");
+ Assert.notNull(index, "Index must not be null");
+
+ return doBulkOperation(queries, bulkOptions, index).then();
+ }
+
/**
* Customization hook on the actual execution result {@link Publisher}.
* You know what you're doing here? Well fair enough, go ahead on your own risk.
@@ -146,6 +202,33 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return Mono.from(execute(client -> client.index(request)));
}
+ protected Flux doBulkOperation(List> queries, BulkOptions bulkOptions, IndexCoordinates index) {
+ BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
+ return client.bulk(bulkRequest) //
+ .onErrorMap(e -> new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) //
+ .flatMap(this::checkForBulkOperationFailure) //
+ .flatMapMany(response -> Flux.fromArray(response.getItems()));
+ }
+
+ protected Mono checkForBulkOperationFailure(BulkResponse bulkResponse) {
+
+ if (bulkResponse.hasFailures()) {
+ Map failedDocuments = new HashMap<>();
+ for (BulkItemResponse item : bulkResponse.getItems()) {
+ if (item.isFailed()) {
+ failedDocuments.put(item.getId(), item.getFailureMessage());
+ }
+ }
+ ElasticsearchException exception = new ElasticsearchException(
+ "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
+ + failedDocuments + ']',
+ failedDocuments);
+ return Mono.error(exception);
+ } else {
+ return Mono.just(bulkResponse);
+ }
+ }
+
/**
* Customization hook on the actual execution result {@link Publisher}.
*
@@ -178,29 +261,48 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
private Mono doIndex(Object value, AdaptibleEntity> entity, IndexCoordinates index) {
return Mono.defer(() -> {
-
- Object id = entity.getId();
-
- IndexRequest request = id != null
- ? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
- : new IndexRequest(index.getIndexName());
-
- request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
-
- if (entity.isVersionedEntity()) {
-
- Object version = entity.getVersion();
- if (version != null) {
- request.version(((Number) version).longValue());
- request.versionType(EXTERNAL);
- }
- }
-
+ IndexRequest request = getIndexRequest(value, entity, index);
request = prepareIndexRequest(value, request);
return doIndex(request);
});
}
+ private IndexRequest getIndexRequest(Object value, AdaptibleEntity> entity, IndexCoordinates index) {
+ Object id = entity.getId();
+
+ IndexRequest request = id != null
+ ? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
+ : new IndexRequest(index.getIndexName());
+
+ request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
+
+ if (entity.isVersionedEntity()) {
+
+ Number version = entity.getVersion();
+ if (version != null) {
+ request.version(version.longValue());
+ request.versionType(EXTERNAL);
+ }
+ }
+ return request;
+ }
+
+ private IndexQuery getIndexQuery(Object value, AdaptibleEntity> entity) {
+ Object id = entity.getId();
+ IndexQuery query = new IndexQuery();
+ if (id != null) {
+ query.setId(id.toString());
+ }
+ query.setObject(value);
+ if (entity.isVersionedEntity()) {
+ Number version = entity.getVersion();
+ if (version != null) {
+ query.setVersion(version.longValue());
+ }
+ }
+ return query;
+ }
+
@Override
public Mono findById(String id, Class entityType) {
return findById(id, entityType, getIndexCoordinatesFor(entityType));
diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java
index 3f529ba0e..e2f57275c 100644
--- a/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java
+++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/ElasticsearchEntityInformation.java
@@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.repository.support;
import org.elasticsearch.index.VersionType;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.repository.core.EntityInformation;
+import org.springframework.lang.Nullable;
/**
* @param
@@ -27,6 +28,7 @@ import org.springframework.data.repository.core.EntityInformation;
* @author Christoph Strobl
* @author Ivan Greene
* @author Peter-Josef Meisch
+ * @author Aleksei Arsenev
*/
public interface ElasticsearchEntityInformation extends EntityInformation {
@@ -34,9 +36,11 @@ public interface ElasticsearchEntityInformation extends EntityInformation
IndexCoordinates getIndexCoordinates();
+ @Nullable
Long getVersion(T entity);
VersionType getVersionType();
+ @Nullable
String getParentId(T entity);
}
diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java
index bc88bc301..1b4d8f4dd 100644
--- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java
+++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java
@@ -15,21 +15,25 @@
*/
package org.springframework.data.elasticsearch.repository.support;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
+import org.elasticsearch.index.query.QueryBuilders;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
+import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
+import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
+import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.util.Assert;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author Christoph Strobl
* @author Peter-Josef Meisch
+ * @author Aleksei Arsenev
* @since 3.2
*/
public class SimpleReactiveElasticsearchRepository implements ReactiveElasticsearchRepository {
@@ -65,7 +69,9 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla
public Flux saveAll(Publisher entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!");
- return Flux.from(entityStream).flatMap(this::save);
+
+ return elasticsearchOperations
+ .saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates());
}
@Override
@@ -117,14 +123,22 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla
Assert.notNull(ids, "Ids must not be null!");
- return Flux.fromIterable(ids).flatMap(this::findById);
+ return findAllById(Flux.fromIterable(ids));
}
@Override
public Flux findAllById(Publisher idStream) {
Assert.notNull(idStream, "IdStream must not be null!");
- return Flux.from(idStream).buffer().flatMap(this::findAllById);
+ return Flux.from(idStream) //
+ .map(ID::toString) //
+ .collectList() //
+ .map(ids -> new NativeSearchQueryBuilder().withIds(ids).build()) //
+ .flatMapMany(query -> {
+
+ IndexCoordinates index = entityInformation.getIndexCoordinates();
+ return elasticsearchOperations.multiGet(query, entityInformation.getJavaType(), index);
+ });
}
@Override
@@ -169,7 +183,28 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla
public Mono deleteAll(Publisher extends T> entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!");
- return Flux.from(entityStream).flatMap(this::delete).then();
+ return Flux.from(entityStream) //
+ .map(entity -> {
+
+ ID id = entityInformation.getId(entity);
+ if (id == null) {
+ throw new IllegalStateException("Entity id must not be null!");
+ }
+ return convertId(id);
+ })
+ .collectList()
+ .map(objects -> {
+
+ return new StringQuery(QueryBuilders.idsQuery() //
+ .addIds(objects.toArray(new String[0])) //
+ .toString());
+ }) //
+ .flatMap(query -> {
+
+ return elasticsearchOperations
+ .deleteBy(query, entityInformation.getJavaType(), entityInformation.getIndexCoordinates());
+ }) //
+ .then();
}
@Override
diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java
index 1d855e3c6..60a0a37dd 100644
--- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java
+++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java
@@ -15,32 +15,9 @@
*/
package org.springframework.data.elasticsearch.core;
-import static org.assertj.core.api.Assertions.*;
-import static org.elasticsearch.index.query.QueryBuilders.*;
-import static org.springframework.data.elasticsearch.annotations.FieldType.*;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-import java.lang.Long;
-import java.lang.Object;
-import java.net.ConnectException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
+import lombok.*;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.AfterEach;
@@ -56,17 +33,26 @@ import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.Score;
+import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
-import org.springframework.data.elasticsearch.core.query.Criteria;
-import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
-import org.springframework.data.elasticsearch.core.query.IndexQuery;
-import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
-import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
-import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
-import org.springframework.data.elasticsearch.core.query.StringQuery;
+import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.net.ConnectException;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+import static org.springframework.data.elasticsearch.annotations.FieldType.Text;
/**
* Integration tests for {@link ReactiveElasticsearchTemplate}.
@@ -76,6 +62,7 @@ import org.springframework.util.StringUtils;
* @author Peter-Josef Meisch
* @author Farid Azaza
* @author Martin Choraine
+ * @author Aleksei Arsenev
*/
@SpringIntegrationTest
public class ReactiveElasticsearchTemplateTests {
@@ -711,6 +698,100 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
+ @Test // DATAES-623
+ public void shouldReturnObjectsForGivenIdsUsingMultiGet() {
+ SampleEntity entity1 = randomEntity("test message 1");
+ entity1.rate = 1;
+ index(entity1);
+ SampleEntity entity2 = randomEntity("test message 2");
+ entity2.rate = 2;
+ index(entity2);
+
+ NativeSearchQuery query = new NativeSearchQueryBuilder() //
+ .withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
+ .build();
+
+ template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
+ .as(StepVerifier::create) //
+ .expectNext(entity1, entity2) //
+ .verifyComplete();
+ }
+
+ @Test // DATAES-623
+ public void shouldReturnObjectsForGivenIdsUsingMultiGetWithFields() {
+ SampleEntity entity1 = randomEntity("test message 1");
+ entity1.rate = 1;
+ index(entity1);
+ SampleEntity entity2 = randomEntity("test message 2");
+ entity2.rate = 2;
+ index(entity2);
+
+ NativeSearchQuery query = new NativeSearchQueryBuilder() //
+ .withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
+ .withFields("message") //
+ .build();
+
+ template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
+ .as(StepVerifier::create) //
+ .expectNextCount(2) //
+ .verifyComplete();
+ }
+
+ @Test // DATAES-623
+ public void shouldDoBulkUpdate() {
+ SampleEntity entity1 = randomEntity("test message 1");
+ entity1.rate = 1;
+ index(entity1);
+ SampleEntity entity2 = randomEntity("test message 2");
+ entity2.rate = 2;
+ index(entity2);
+
+ IndexRequest indexRequest1 = new IndexRequest();
+ indexRequest1.source("message", "updated 1");
+ UpdateQuery updateQuery1 = new UpdateQueryBuilder() //
+ .withId(entity1.getId()) //
+ .withIndexRequest(indexRequest1).build();
+
+ IndexRequest indexRequest2 = new IndexRequest();
+ indexRequest2.source("message", "updated 2");
+ UpdateQuery updateQuery2 = new UpdateQueryBuilder() //
+ .withId(entity2.getId()) //
+ .withIndexRequest(indexRequest2).build();
+
+ List queries = Arrays.asList(updateQuery1, updateQuery2);
+ template.bulkUpdate(queries, IndexCoordinates.of(DEFAULT_INDEX)).block();
+
+ NativeSearchQuery getQuery = new NativeSearchQueryBuilder() //
+ .withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
+ .build();
+ template.multiGet(getQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
+ .as(StepVerifier::create) //
+ .expectNextMatches(entity -> entity.getMessage().equals("updated 1")) //
+ .expectNextMatches(entity -> entity.getMessage().equals("updated 2")) //
+ .verifyComplete();
+ }
+
+ @Test // DATAES-623
+ void shouldSaveAll() {
+ SampleEntity entity1 = randomEntity("test message 1");
+ entity1.rate = 1;
+ SampleEntity entity2 = randomEntity("test message 2");
+ entity2.rate = 2;
+
+ template.saveAll(Mono.just(Arrays.asList(entity1, entity2)), IndexCoordinates.of(DEFAULT_INDEX)) //
+ .as(StepVerifier::create) //
+ .expectNext(entity1) //
+ .expectNext(entity2) //
+ .verifyComplete();
+
+ NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
+ template.search(searchQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
+ .as(StepVerifier::create) //
+ .expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) //
+ .expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) //
+ .verifyComplete();
+ }
+
@Data
@Document(indexName = "marvel")
static class Person {