DATAES-839 - ReactiveElasticsearchTemplate should use RequestFactory.

Original PR: #466
This commit is contained in:
Peter-Josef Meisch 2020-05-21 12:29:00 +02:00 committed by GitHub
parent bcd7c2a1d8
commit dc6734db43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 163 deletions

View File

@ -0,0 +1,37 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import org.springframework.dao.DataRetrievalFailureException;
import java.util.Map;
/**
* @author Peter-Josef Meisch
* @since 4.1
*/
public class BulkFailureException extends DataRetrievalFailureException {
private final Map<String, String> failedDocuments;
public BulkFailureException(String msg, Map<String, String> failedDocuments) {
super(msg);
this.failedDocuments = failedDocuments;
}
public Map<String, String> getFailedDocuments() {
return failedDocuments;
}
}

View File

@ -22,6 +22,7 @@ import org.springframework.dao.UncategorizedDataAccessException;
* @since 4.0
*/
public class UncategorizedElasticsearchException extends UncategorizedDataAccessException {
public UncategorizedElasticsearchException(String msg, Throwable cause) {
super(msg, cause);
}

View File

@ -36,7 +36,7 @@ import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
@ -405,7 +405,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
if (item.isFailed())
failedDocuments.put(item.getId(), item.getFailureMessage());
}
throw new ElasticsearchException(
throw new BulkFailureException(
"Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
+ failedDocuments + ']',
failedDocuments);

View File

@ -210,7 +210,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");
DeleteRequest request = new DeleteRequest(index.getIndexName(), elasticsearchConverter.convertId(id));
DeleteRequest request = requestFactory.deleteRequest(elasticsearchConverter.convertId(id), index);
return execute(client -> client.delete(request, RequestOptions.DEFAULT).getId());
}

View File

@ -15,15 +15,11 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -41,19 +37,10 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.WrapperQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,9 +48,9 @@ import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.convert.EntityReader;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.EntityOperations.Entity;
@ -82,10 +69,8 @@ import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMa
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
@ -194,6 +179,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
T savedEntity = it.getT1();
IndexResponse indexResponse = it.getT2();
AdaptibleEntity<T> adaptableEntity = operations.forEntity(savedEntity, converter.getConversionService());
// noinspection ReactiveStreamsNullableInLambdaInTransform
return adaptableEntity.populateIdIfNecessary(indexResponse.getId());
}).flatMap(saved -> maybeCallAfterSave(saved, index));
}
@ -268,7 +254,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
protected Flux<BulkItemResponse> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
return client.bulk(bulkRequest) //
.onErrorMap(e -> new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) //
.onErrorMap(
e -> new UncategorizedElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) //
.flatMap(this::checkForBulkOperationFailure) //
.flatMapMany(response -> Flux.fromArray(response.getItems()));
}
@ -283,7 +270,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
failedDocuments.put(item.getId(), item.getFailureMessage());
}
}
ElasticsearchException exception = new ElasticsearchException(
BulkFailureException exception = new BulkFailureException(
"Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
+ failedDocuments + ']',
failedDocuments);
@ -315,9 +302,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return doExists(id, index);
}
private Mono<Boolean> doExists(String id, @Nullable IndexCoordinates index) {
return Mono.defer(() -> doExists(new GetRequest(index.getIndexName(), id)));
private Mono<Boolean> doExists(String id, IndexCoordinates index) {
return Mono.defer(() -> doExists(requestFactory.getRequest(id, index)));
}
/**
@ -334,27 +320,30 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
private <T> Mono<Tuple2<T, IndexResponse>> doIndex(T entity, IndexCoordinates index) {
AdaptibleEntity<?> adaptibleEntity = operations.forEntity(entity, converter.getConversionService());
IndexRequest request = getIndexRequest(entity, adaptibleEntity, index);
IndexRequest request = requestFactory.indexRequest(getIndexQuery(entity), index);
request = prepareIndexRequest(entity, request);
return Mono.just(entity).zipWith(doIndex(request));
}
private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, IndexCoordinates index) {
private IndexQuery getIndexQuery(Object value) {
AdaptibleEntity<?> entity = operations.forEntity(value, converter.getConversionService());
Object id = entity.getId();
IndexQuery query = new IndexQuery();
IndexRequest request = id != null ? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
: new IndexRequest(index.getIndexName());
request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
if (id != null) {
query.setId(id.toString());
}
query.setObject(value);
boolean usingSeqNo = false;
if (entity.hasSeqNoPrimaryTerm()) {
SeqNoPrimaryTerm seqNoPrimaryTerm = entity.getSeqNoPrimaryTerm();
if (seqNoPrimaryTerm != null) {
request.setIfSeqNo(seqNoPrimaryTerm.getSequenceNumber());
request.setIfPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm());
query.setSeqNo(seqNoPrimaryTerm.getSequenceNumber());
query.setPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm());
usingSeqNo = true;
}
}
@ -364,32 +353,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Number version = entity.getVersion();
if (version != null) {
request.version(version.longValue());
request.versionType(EXTERNAL);
}
}
return request;
}
private IndexQuery getIndexQuery(Object value) {
AdaptibleEntity<?> entity = operations.forEntity(value, converter.getConversionService());
Object id = entity.getId();
IndexQuery query = new IndexQuery();
if (id != null) {
query.setId(id.toString());
}
query.setObject(value);
if (entity.isVersionedEntity()) {
Number version = entity.getVersion();
if (version != null) {
query.setVersion(version.longValue());
}
}
return query;
}
@ -410,9 +378,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
private Mono<GetResult> doGet(String id, ElasticsearchPersistentEntity<?> entity, IndexCoordinates index) {
return Mono.defer(() -> {
return doGet(new GetRequest(index.getIndexName(), id));
});
return Mono.defer(() -> doGet(requestFactory.getRequest(id, index)));
}
/**
@ -465,8 +431,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
private Mono<String> doDeleteById(String id, IndexCoordinates index) {
return Mono.defer(() -> {
return doDelete(prepareDeleteRequest(new DeleteRequest(index.getIndexName(), id)));
DeleteRequest request = requestFactory.deleteRequest(id, index);
return doDelete(prepareDeleteRequest(request));
});
}
@ -479,8 +445,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, getPersistentEntityFor(entityType), index).map(BulkByScrollResponse::getDeleted)
.publishNext();
return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).publishNext();
}
@Override
@ -488,13 +453,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return delete(query, entityType, getIndexCoordinatesFor(entityType));
}
private Flux<BulkByScrollResponse> doDeleteBy(Query query, ElasticsearchPersistentEntity<?> entity,
IndexCoordinates index) {
private Flux<BulkByScrollResponse> doDeleteBy(Query query, Class<?> entityType, IndexCoordinates index) {
return Flux.defer(() -> {
DeleteByQueryRequest request = new DeleteByQueryRequest(index.getIndexNames());
request.setQuery(mappedQuery(query, entity));
DeleteByQueryRequest request = requestFactory.deleteByQueryRequest(query, entityType, index);
return doDeleteBy(prepareDeleteByRequest(request));
});
}
@ -552,8 +514,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
*/
protected DeleteByQueryRequest prepareDeleteByRequest(DeleteByQueryRequest request) {
if (refreshPolicy != null && !RefreshPolicy.NONE.equals(refreshPolicy)) {
request = request.setRefresh(true);
if (refreshPolicy != null) {
if (RefreshPolicy.NONE.equals(refreshPolicy)) {
request = request.setRefresh(false);
} else {
request = request.setRefresh(true);
}
}
if (indicesOptions != null) {
@ -661,43 +628,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
});
}
private CountRequest buildCountRequest(Query query, ElasticsearchPersistentEntity<?> entity, IndexCoordinates index) {
CountRequest request = new CountRequest(index.getIndexNames());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity));
searchSourceBuilder.trackScores(query.getTrackScores());
QueryBuilder postFilterQuery = mappedFilterQuery(query, entity);
if (postFilterQuery != null) {
searchSourceBuilder.postFilter(postFilterQuery);
}
if (query.getSourceFilter() != null) {
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
}
if (query instanceof NativeSearchQuery && ((NativeSearchQuery) query).getCollapseBuilder() != null) {
searchSourceBuilder.collapse(((NativeSearchQuery) query).getCollapseBuilder());
}
sort(query, entity).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore());
}
if (query.getIndicesOptions() != null) {
request.indicesOptions(query.getIndicesOptions());
}
if (query.getPreference() != null) {
request.preference(query.getPreference());
}
request.source(searchSourceBuilder);
return request;
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
@ -762,61 +692,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
.map(DocumentAdapters::from).onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
@Nullable
private QueryBuilder mappedFilterQuery(Query query, ElasticsearchPersistentEntity<?> entity) {
if (query instanceof NativeSearchQuery) {
return ((NativeSearchQuery) query).getFilter();
}
return null;
}
private QueryBuilder mappedQuery(Query query, ElasticsearchPersistentEntity<?> entity) {
QueryBuilder elasticsearchQuery = null;
if (query instanceof CriteriaQuery) {
converter.updateQuery((CriteriaQuery) query, entity.getType());
elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(((CriteriaQuery) query).getCriteria());
} else if (query instanceof StringQuery) {
elasticsearchQuery = new WrapperQueryBuilder(((StringQuery) query).getSource());
} else if (query instanceof NativeSearchQuery) {
elasticsearchQuery = ((NativeSearchQuery) query).getQuery();
} else {
throw new IllegalArgumentException(String.format("Unknown query type '%s'.", query.getClass()));
}
return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery();
}
private static List<FieldSortBuilder> sort(Query query, ElasticsearchPersistentEntity<?> entity) {
if (query.getSort() == null || query.getSort().isUnsorted()) {
return Collections.emptyList();
}
List<FieldSortBuilder> mappedSort = new ArrayList<>();
for (Sort.Order order : query.getSort()) {
ElasticsearchPersistentProperty property = entity.getPersistentProperty(order.getProperty());
String fieldName = property != null ? property.getFieldName() : order.getProperty();
FieldSortBuilder sort = SortBuilders.fieldSort(fieldName)
.order(order.getDirection().isDescending() ? SortOrder.DESC : SortOrder.ASC);
if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) {
sort.missing("_first");
} else if (order.getNullHandling() == Sort.NullHandling.NULLS_LAST) {
sort.missing("_last");
}
mappedSort.add(sort);
}
return mappedSort;
}
/**
* Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the
* {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable.
@ -950,7 +825,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return Mono.just(entity);
}
// endregion
protected interface DocumentCallback<T> {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.MultiGetRequest;
@ -249,6 +250,10 @@ class RequestFactory {
return deleteByQueryRequest;
}
public DeleteRequest deleteRequest(String id, IndexCoordinates index) {
return new DeleteRequest(index.getIndexName(), id);
}
@Deprecated
public DeleteByQueryRequestBuilder deleteByQueryRequestBuilder(Client client, DeleteQuery deleteQuery,
IndexCoordinates index) {
@ -344,6 +349,7 @@ class RequestFactory {
throw new ElasticsearchException(
"object or source is null, failed to index the document [id: " + query.getId() + ']');
}
if (query.getVersion() != null) {
indexRequest.version(query.getVersion());
VersionType versionType = retrieveVersionTypeFromPersistentEntity(query.getObject().getClass());
@ -353,6 +359,7 @@ class RequestFactory {
if (query.getSeqNo() != null) {
indexRequest.setIfSeqNo(query.getSeqNo());
}
if (query.getPrimaryTerm() != null) {
indexRequest.setIfPrimaryTerm(query.getPrimaryTerm());
}