DATAES-623 - Add bulk operations for ReactiveElasticsearchRepository.

Original PR:#376
This commit is contained in:
alesharik 2020-01-13 22:26:19 +03:00 committed by Peter-Josef Meisch
parent e670a88772
commit d7262e4370
5 changed files with 358 additions and 71 deletions

View File

@ -16,16 +16,23 @@
package org.springframework.data.elasticsearch.core;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.util.Assert;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* The reactive operations for the
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html">Elasticsearch Document APIs</a>.
*
* @author Peter-Josef Meisch
* @author Aleksei Arsenev
* @since 4.0
*/
public interface ReactiveDocumentOperations {
@ -78,6 +85,64 @@ public interface ReactiveDocumentOperations {
*/
<T> Mono<T> save(T entity, IndexCoordinates index);
/**
* Index entities under the given {@literal type} in the given {@literal index}. If the {@literal index} is
* {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}.
*
* @param entities must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @param <T>
* @return a {@link Flux} emitting saved entities.
* @since 4.0
*/
default <T> Flux<T> saveAll(Iterable<T> entities, IndexCoordinates index) {
List<T> entityList = new ArrayList<>();
entities.forEach(entityList::add);
return saveAll(Mono.just(entityList), index);
}
/**
* Index entities under the given {@literal type} in the given {@literal index}. If the {@literal index} is
* {@literal null} or empty the index name provided via entity metadata is used. Same for the {@literal type}.
*
* @param entities must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @param <T>
* @return a {@link Flux} emitting saved entities.
* @since 4.0
*/
<T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, IndexCoordinates index);
/**
* Execute a multiGet against elasticsearch for the given ids.
*
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned
* @param index the index(es) from which the objects are read.
* @return flux with list of nullable objects
* @since 4.0
*/
<T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
/**
* Bulk update all objects. Will do update.
*
* @param queries the queries to execute in bulk
* @since 4.0
*/
default Mono<Void> bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
return bulkUpdate(queries, BulkOptions.defaultOptions(), index);
}
/**
* Bulk update all objects. Will do update.
*
* @param queries the queries to execute in bulk
* @param bulkOptions options to be added to the bulk request
* @since 4.0
*/
Mono<Void> bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index);
/**
* Find the document with the given {@literal id} mapped onto the given {@literal entityType}.
*

View File

@ -15,17 +15,12 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -48,26 +43,32 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.EntityOperations.Entity;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.stream.Collectors;
import static org.elasticsearch.index.VersionType.EXTERNAL;
/**
* @author Christoph Strobl
@ -76,6 +77,7 @@ import org.springframework.util.Assert;
* @author Martin Choraine
* @author Peter-Josef Meisch
* @author Mathias Teier
* @author Aleksei Arsenev
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations {
@ -135,6 +137,60 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return save(entity, getIndexCoordinatesFor(entity.getClass()));
}
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, IndexCoordinates index) {
Assert.notNull(entities, "Entities must not be null!");
return entities.flatMapMany(entityList -> {
List<AdaptibleEntity<? extends T>> adaptibleEntities = entityList.stream() //
.map(e -> operations.forEntity(e, converter.getConversionService())) //
.collect(Collectors.toList());
Iterator<AdaptibleEntity<? extends T>> iterator = adaptibleEntities.iterator();
List<IndexQuery> indexRequests = adaptibleEntities.stream() //
.map(e -> getIndexQuery(e.getBean(), e)) //
.collect(Collectors.toList());
return doBulkOperation(indexRequests, BulkOptions.defaultOptions(), index) //
.map(bulkItemResponse -> {
AdaptibleEntity<? extends T> mappedEntity = iterator.next();
mappedEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
return mappedEntity.getBean();
});
});
}
@Override
public <T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(index, "Index must not be null");
Assert.notNull(clazz, "Class must not be null");
Assert.notNull(query, "Query must not be null");
Assert.notEmpty(query.getIds(), "No Id define for Query");
MultiGetRequest request = requestFactory.multiGetRequest(query, index);
return Flux.from(execute(client -> client.multiGet(request))) //
.handle((result, sink) -> {
Document document = DocumentAdapters.from(result);
T entity = converter.mapDocument(document, clazz);
if (entity != null) {
sink.next(entity);
}
});
}
@Override
public Mono<Void> bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
Assert.notNull(queries, "List of UpdateQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
Assert.notNull(index, "Index must not be null");
return doBulkOperation(queries, bulkOptions, index).then();
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
* You know what you're doing here? Well fair enough, go ahead on your own risk.
@ -146,6 +202,33 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return Mono.from(execute(client -> client.index(request)));
}
protected Flux<BulkItemResponse> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
return client.bulk(bulkRequest) //
.onErrorMap(e -> new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) //
.flatMap(this::checkForBulkOperationFailure) //
.flatMapMany(response -> Flux.fromArray(response.getItems()));
}
protected Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<>();
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
failedDocuments.put(item.getId(), item.getFailureMessage());
}
}
ElasticsearchException exception = new ElasticsearchException(
"Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
+ failedDocuments + ']',
failedDocuments);
return Mono.error(exception);
} else {
return Mono.just(bulkResponse);
}
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
@ -178,29 +261,48 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
private Mono<IndexResponse> doIndex(Object value, AdaptibleEntity<?> entity, IndexCoordinates index) {
return Mono.defer(() -> {
Object id = entity.getId();
IndexRequest request = id != null
? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
: new IndexRequest(index.getIndexName());
request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
if (entity.isVersionedEntity()) {
Object version = entity.getVersion();
if (version != null) {
request.version(((Number) version).longValue());
request.versionType(EXTERNAL);
}
}
IndexRequest request = getIndexRequest(value, entity, index);
request = prepareIndexRequest(value, request);
return doIndex(request);
});
}
private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, IndexCoordinates index) {
Object id = entity.getId();
IndexRequest request = id != null
? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
: new IndexRequest(index.getIndexName());
request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
if (entity.isVersionedEntity()) {
Number version = entity.getVersion();
if (version != null) {
request.version(version.longValue());
request.versionType(EXTERNAL);
}
}
return request;
}
private IndexQuery getIndexQuery(Object value, AdaptibleEntity<?> entity) {
Object id = entity.getId();
IndexQuery query = new IndexQuery();
if (id != null) {
query.setId(id.toString());
}
query.setObject(value);
if (entity.isVersionedEntity()) {
Number version = entity.getVersion();
if (version != null) {
query.setVersion(version.longValue());
}
}
return query;
}
@Override
public <T> Mono<T> findById(String id, Class<T> entityType) {
return findById(id, entityType, getIndexCoordinatesFor(entityType));

View File

@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.repository.support;
import org.elasticsearch.index.VersionType;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.repository.core.EntityInformation;
import org.springframework.lang.Nullable;
/**
* @param <T>
@ -27,6 +28,7 @@ import org.springframework.data.repository.core.EntityInformation;
* @author Christoph Strobl
* @author Ivan Greene
* @author Peter-Josef Meisch
* @author Aleksei Arsenev
*/
public interface ElasticsearchEntityInformation<T, ID> extends EntityInformation<T, ID> {
@ -34,9 +36,11 @@ public interface ElasticsearchEntityInformation<T, ID> extends EntityInformation
IndexCoordinates getIndexCoordinates();
@Nullable
Long getVersion(T entity);
VersionType getVersionType();
@Nullable
String getParentId(T entity);
}

View File

@ -15,21 +15,25 @@
*/
package org.springframework.data.elasticsearch.repository.support;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.elasticsearch.index.query.QueryBuilders;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Christoph Strobl
* @author Peter-Josef Meisch
* @author Aleksei Arsenev
* @since 3.2
*/
public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveElasticsearchRepository<T, ID> {
@ -65,7 +69,9 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!");
return Flux.from(entityStream).flatMap(this::save);
return elasticsearchOperations
.saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates());
}
@Override
@ -117,14 +123,22 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
Assert.notNull(ids, "Ids must not be null!");
return Flux.fromIterable(ids).flatMap(this::findById);
return findAllById(Flux.fromIterable(ids));
}
@Override
public Flux<T> findAllById(Publisher<ID> idStream) {
Assert.notNull(idStream, "IdStream must not be null!");
return Flux.from(idStream).buffer().flatMap(this::findAllById);
return Flux.from(idStream) //
.map(ID::toString) //
.collectList() //
.map(ids -> new NativeSearchQueryBuilder().withIds(ids).build()) //
.flatMapMany(query -> {
IndexCoordinates index = entityInformation.getIndexCoordinates();
return elasticsearchOperations.multiGet(query, entityInformation.getJavaType(), index);
});
}
@Override
@ -169,7 +183,28 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!");
return Flux.from(entityStream).flatMap(this::delete).then();
return Flux.from(entityStream) //
.map(entity -> {
ID id = entityInformation.getId(entity);
if (id == null) {
throw new IllegalStateException("Entity id must not be null!");
}
return convertId(id);
})
.collectList()
.map(objects -> {
return new StringQuery(QueryBuilders.idsQuery() //
.addIds(objects.toArray(new String[0])) //
.toString());
}) //
.flatMap(query -> {
return elasticsearchOperations
.deleteBy(query, entityInformation.getJavaType(), entityInformation.getIndexCoordinates());
}) //
.then();
}
@Override

View File

@ -15,32 +15,9 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.lang.Long;
import java.lang.Object;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.*;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.AfterEach;
@ -56,17 +33,26 @@ import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.Score;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.net.ConnectException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.springframework.data.elasticsearch.annotations.FieldType.Text;
/**
* Integration tests for {@link ReactiveElasticsearchTemplate}.
@ -76,6 +62,7 @@ import org.springframework.util.StringUtils;
* @author Peter-Josef Meisch
* @author Farid Azaza
* @author Martin Choraine
* @author Aleksei Arsenev
*/
@SpringIntegrationTest
public class ReactiveElasticsearchTemplateTests {
@ -711,6 +698,100 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-623
public void shouldReturnObjectsForGivenIdsUsingMultiGet() {
SampleEntity entity1 = randomEntity("test message 1");
entity1.rate = 1;
index(entity1);
SampleEntity entity2 = randomEntity("test message 2");
entity2.rate = 2;
index(entity2);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
.build();
template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
.as(StepVerifier::create) //
.expectNext(entity1, entity2) //
.verifyComplete();
}
@Test // DATAES-623
public void shouldReturnObjectsForGivenIdsUsingMultiGetWithFields() {
SampleEntity entity1 = randomEntity("test message 1");
entity1.rate = 1;
index(entity1);
SampleEntity entity2 = randomEntity("test message 2");
entity2.rate = 2;
index(entity2);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
.withFields("message") //
.build();
template.multiGet(query, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@Test // DATAES-623
public void shouldDoBulkUpdate() {
SampleEntity entity1 = randomEntity("test message 1");
entity1.rate = 1;
index(entity1);
SampleEntity entity2 = randomEntity("test message 2");
entity2.rate = 2;
index(entity2);
IndexRequest indexRequest1 = new IndexRequest();
indexRequest1.source("message", "updated 1");
UpdateQuery updateQuery1 = new UpdateQueryBuilder() //
.withId(entity1.getId()) //
.withIndexRequest(indexRequest1).build();
IndexRequest indexRequest2 = new IndexRequest();
indexRequest2.source("message", "updated 2");
UpdateQuery updateQuery2 = new UpdateQueryBuilder() //
.withId(entity2.getId()) //
.withIndexRequest(indexRequest2).build();
List<UpdateQuery> queries = Arrays.asList(updateQuery1, updateQuery2);
template.bulkUpdate(queries, IndexCoordinates.of(DEFAULT_INDEX)).block();
NativeSearchQuery getQuery = new NativeSearchQueryBuilder() //
.withIds(Arrays.asList(entity1.getId(), entity2.getId())) //
.build();
template.multiGet(getQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
.as(StepVerifier::create) //
.expectNextMatches(entity -> entity.getMessage().equals("updated 1")) //
.expectNextMatches(entity -> entity.getMessage().equals("updated 2")) //
.verifyComplete();
}
@Test // DATAES-623
void shouldSaveAll() {
SampleEntity entity1 = randomEntity("test message 1");
entity1.rate = 1;
SampleEntity entity2 = randomEntity("test message 2");
entity2.rate = 2;
template.saveAll(Mono.just(Arrays.asList(entity1, entity2)), IndexCoordinates.of(DEFAULT_INDEX)) //
.as(StepVerifier::create) //
.expectNext(entity1) //
.expectNext(entity2) //
.verifyComplete();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
template.search(searchQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
.as(StepVerifier::create) //
.expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) //
.expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) //
.verifyComplete();
}
@Data
@Document(indexName = "marvel")
static class Person {