diff --git a/src/main/java/org/springframework/data/elasticsearch/BulkFailureException.java b/src/main/java/org/springframework/data/elasticsearch/BulkFailureException.java new file mode 100644 index 000000000..fe6aba345 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/BulkFailureException.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch; + +import org.springframework.dao.DataRetrievalFailureException; + +import java.util.Map; + +/** + * @author Peter-Josef Meisch + * @since 4.0.1 (ported back from master (4.1) branch) + */ +public class BulkFailureException extends DataRetrievalFailureException { + private final Map failedDocuments; + + public BulkFailureException(String msg, Map failedDocuments) { + super(msg); + this.failedDocuments = failedDocuments; + } + + public Map getFailedDocuments() { + return failedDocuments; + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/UncategorizedElasticsearchException.java b/src/main/java/org/springframework/data/elasticsearch/UncategorizedElasticsearchException.java index 70696b86f..bc241291a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/UncategorizedElasticsearchException.java +++ b/src/main/java/org/springframework/data/elasticsearch/UncategorizedElasticsearchException.java @@ -22,6 +22,7 @@ import org.springframework.dao.UncategorizedDataAccessException; * @since 4.0 */ public class UncategorizedElasticsearchException extends UncategorizedDataAccessException { + public UncategorizedElasticsearchException(String msg, Throwable cause) { super(msg, cause); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index 3e809613b..605275bdc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -36,7 +36,7 @@ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.convert.EntityReader; -import org.springframework.data.elasticsearch.ElasticsearchException; +import org.springframework.data.elasticsearch.BulkFailureException; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.Document; @@ -405,7 +405,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper if (item.isFailed()) failedDocuments.put(item.getId(), item.getFailureMessage()); } - throw new ElasticsearchException( + throw new BulkFailureException( "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + failedDocuments + ']', failedDocuments); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 8a1eeaa71..072a475d9 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -210,7 +210,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { Assert.notNull(id, "id must not be null"); Assert.notNull(index, "index must not be null"); - DeleteRequest request = new DeleteRequest(index.getIndexName(), elasticsearchConverter.convertId(id)); + DeleteRequest request = requestFactory.deleteRequest(elasticsearchConverter.convertId(id), index); return execute(client -> client.delete(request, RequestOptions.DEFAULT).getId()); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 21f6b36fa..4b353439d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -15,15 +15,11 @@ */ package org.springframework.data.elasticsearch.core; -import static org.elasticsearch.index.VersionType.*; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,19 +37,10 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.WrapperQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.aggregations.Aggregation; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.FieldSortBuilder; -import org.elasticsearch.search.sort.SortBuilders; -import org.elasticsearch.search.sort.SortOrder; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +48,9 @@ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.convert.EntityReader; -import org.springframework.data.domain.Sort; -import org.springframework.data.elasticsearch.ElasticsearchException; +import org.springframework.data.elasticsearch.BulkFailureException; import org.springframework.data.elasticsearch.NoSuchIndexException; +import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity; import org.springframework.data.elasticsearch.core.EntityOperations.Entity; @@ -82,10 +69,8 @@ import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMa import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; -import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.support.VersionInfo; import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; @@ -194,6 +179,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera T savedEntity = it.getT1(); IndexResponse indexResponse = it.getT2(); AdaptibleEntity adaptableEntity = operations.forEntity(savedEntity, converter.getConversionService()); + // noinspection ReactiveStreamsNullableInLambdaInTransform return adaptableEntity.populateIdIfNecessary(indexResponse.getId()); }).flatMap(saved -> maybeCallAfterSave(saved, index)); } @@ -268,7 +254,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera protected Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); return client.bulk(bulkRequest) // - .onErrorMap(e -> new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) // + .onErrorMap( + e -> new UncategorizedElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e)) // .flatMap(this::checkForBulkOperationFailure) // .flatMapMany(response -> Flux.fromArray(response.getItems())); } @@ -283,7 +270,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera failedDocuments.put(item.getId(), item.getFailureMessage()); } } - ElasticsearchException exception = new ElasticsearchException( + BulkFailureException exception = new BulkFailureException( "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + failedDocuments + ']', failedDocuments); @@ -315,9 +302,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return doExists(id, index); } - private Mono doExists(String id, @Nullable IndexCoordinates index) { - - return Mono.defer(() -> doExists(new GetRequest(index.getIndexName(), id))); + private Mono doExists(String id, IndexCoordinates index) { + return Mono.defer(() -> doExists(requestFactory.getRequest(id, index))); } /** @@ -334,27 +320,30 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera private Mono> doIndex(T entity, IndexCoordinates index) { - AdaptibleEntity adaptibleEntity = operations.forEntity(entity, converter.getConversionService()); - IndexRequest request = getIndexRequest(entity, adaptibleEntity, index); + IndexRequest request = requestFactory.indexRequest(getIndexQuery(entity), index); request = prepareIndexRequest(entity, request); return Mono.just(entity).zipWith(doIndex(request)); } - private IndexRequest getIndexRequest(Object value, AdaptibleEntity entity, IndexCoordinates index) { + private IndexQuery getIndexQuery(Object value) { + AdaptibleEntity entity = operations.forEntity(value, converter.getConversionService()); + Object id = entity.getId(); + IndexQuery query = new IndexQuery(); - IndexRequest request = id != null ? new IndexRequest(index.getIndexName()).id(converter.convertId(id)) - : new IndexRequest(index.getIndexName()); - - request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE); + if (id != null) { + query.setId(id.toString()); + } + query.setObject(value); boolean usingSeqNo = false; + if (entity.hasSeqNoPrimaryTerm()) { SeqNoPrimaryTerm seqNoPrimaryTerm = entity.getSeqNoPrimaryTerm(); if (seqNoPrimaryTerm != null) { - request.setIfSeqNo(seqNoPrimaryTerm.getSequenceNumber()); - request.setIfPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm()); + query.setSeqNo(seqNoPrimaryTerm.getSequenceNumber()); + query.setPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm()); usingSeqNo = true; } } @@ -364,32 +353,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Number version = entity.getVersion(); - if (version != null) { - request.version(version.longValue()); - request.versionType(EXTERNAL); - } - } - - return request; - } - - private IndexQuery getIndexQuery(Object value) { - AdaptibleEntity entity = operations.forEntity(value, converter.getConversionService()); - - Object id = entity.getId(); - IndexQuery query = new IndexQuery(); - if (id != null) { - query.setId(id.toString()); - } - query.setObject(value); - - if (entity.isVersionedEntity()) { - Number version = entity.getVersion(); - if (version != null) { query.setVersion(version.longValue()); } } + return query; } @@ -410,9 +378,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera } private Mono doGet(String id, ElasticsearchPersistentEntity entity, IndexCoordinates index) { - return Mono.defer(() -> { - return doGet(new GetRequest(index.getIndexName(), id)); - }); + return Mono.defer(() -> doGet(requestFactory.getRequest(id, index))); } /** @@ -465,8 +431,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera private Mono doDeleteById(String id, IndexCoordinates index) { return Mono.defer(() -> { - - return doDelete(prepareDeleteRequest(new DeleteRequest(index.getIndexName(), id))); + DeleteRequest request = requestFactory.deleteRequest(id, index); + return doDelete(prepareDeleteRequest(request)); }); } @@ -479,8 +445,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(query, "Query must not be null!"); - return doDeleteBy(query, getPersistentEntityFor(entityType), index).map(BulkByScrollResponse::getDeleted) - .publishNext(); + return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).publishNext(); } @Override @@ -488,13 +453,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return delete(query, entityType, getIndexCoordinatesFor(entityType)); } - private Flux doDeleteBy(Query query, ElasticsearchPersistentEntity entity, - IndexCoordinates index) { + private Flux doDeleteBy(Query query, Class entityType, IndexCoordinates index) { return Flux.defer(() -> { - DeleteByQueryRequest request = new DeleteByQueryRequest(index.getIndexNames()); - request.setQuery(mappedQuery(query, entity)); - + DeleteByQueryRequest request = requestFactory.deleteByQueryRequest(query, entityType, index); return doDeleteBy(prepareDeleteByRequest(request)); }); } @@ -552,8 +514,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera */ protected DeleteByQueryRequest prepareDeleteByRequest(DeleteByQueryRequest request) { - if (refreshPolicy != null && !RefreshPolicy.NONE.equals(refreshPolicy)) { - request = request.setRefresh(true); + if (refreshPolicy != null) { + + if (RefreshPolicy.NONE.equals(refreshPolicy)) { + request = request.setRefresh(false); + } else { + request = request.setRefresh(true); + } } if (indicesOptions != null) { @@ -661,43 +628,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera }); } - private CountRequest buildCountRequest(Query query, ElasticsearchPersistentEntity entity, IndexCoordinates index) { - - CountRequest request = new CountRequest(index.getIndexNames()); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(mappedQuery(query, entity)); - searchSourceBuilder.trackScores(query.getTrackScores()); - - QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); - if (postFilterQuery != null) { - searchSourceBuilder.postFilter(postFilterQuery); - } - - if (query.getSourceFilter() != null) { - searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); - } - - if (query instanceof NativeSearchQuery && ((NativeSearchQuery) query).getCollapseBuilder() != null) { - searchSourceBuilder.collapse(((NativeSearchQuery) query).getCollapseBuilder()); - } - - sort(query, entity).forEach(searchSourceBuilder::sort); - - if (query.getMinScore() > 0) { - searchSourceBuilder.minScore(query.getMinScore()); - } - - if (query.getIndicesOptions() != null) { - request.indicesOptions(query.getIndicesOptions()); - } - - if (query.getPreference() != null) { - request.preference(query.getPreference()); - } - request.source(searchSourceBuilder); - return request; - } - /** * Customization hook on the actual execution result {@link Publisher}.
* @@ -762,61 +692,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera .map(DocumentAdapters::from).onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); } - @Nullable - private QueryBuilder mappedFilterQuery(Query query, ElasticsearchPersistentEntity entity) { - - if (query instanceof NativeSearchQuery) { - return ((NativeSearchQuery) query).getFilter(); - } - - return null; - } - - private QueryBuilder mappedQuery(Query query, ElasticsearchPersistentEntity entity) { - - QueryBuilder elasticsearchQuery = null; - - if (query instanceof CriteriaQuery) { - converter.updateQuery((CriteriaQuery) query, entity.getType()); - elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(((CriteriaQuery) query).getCriteria()); - } else if (query instanceof StringQuery) { - elasticsearchQuery = new WrapperQueryBuilder(((StringQuery) query).getSource()); - } else if (query instanceof NativeSearchQuery) { - elasticsearchQuery = ((NativeSearchQuery) query).getQuery(); - } else { - throw new IllegalArgumentException(String.format("Unknown query type '%s'.", query.getClass())); - } - - return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery(); - } - - private static List sort(Query query, ElasticsearchPersistentEntity entity) { - - if (query.getSort() == null || query.getSort().isUnsorted()) { - return Collections.emptyList(); - } - - List mappedSort = new ArrayList<>(); - for (Sort.Order order : query.getSort()) { - - ElasticsearchPersistentProperty property = entity.getPersistentProperty(order.getProperty()); - String fieldName = property != null ? property.getFieldName() : order.getProperty(); - - FieldSortBuilder sort = SortBuilders.fieldSort(fieldName) - .order(order.getDirection().isDescending() ? SortOrder.DESC : SortOrder.ASC); - - if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) { - sort.missing("_first"); - } else if (order.getNullHandling() == Sort.NullHandling.NULLS_LAST) { - sort.missing("_last"); - } - - mappedSort.add(sort); - } - - return mappedSort; - } - /** * Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the * {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable. @@ -950,7 +825,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return Mono.just(entity); } - // endregion protected interface DocumentCallback { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index a26c8b83b..6171be9b4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.MultiGetRequest; @@ -249,6 +250,10 @@ class RequestFactory { return deleteByQueryRequest; } + public DeleteRequest deleteRequest(String id, IndexCoordinates index) { + return new DeleteRequest(index.getIndexName(), id); + } + @Deprecated public DeleteByQueryRequestBuilder deleteByQueryRequestBuilder(Client client, DeleteQuery deleteQuery, IndexCoordinates index) { @@ -344,6 +349,7 @@ class RequestFactory { throw new ElasticsearchException( "object or source is null, failed to index the document [id: " + query.getId() + ']'); } + if (query.getVersion() != null) { indexRequest.version(query.getVersion()); VersionType versionType = retrieveVersionTypeFromPersistentEntity(query.getObject().getClass()); @@ -353,6 +359,7 @@ class RequestFactory { if (query.getSeqNo() != null) { indexRequest.setIfSeqNo(query.getSeqNo()); } + if (query.getPrimaryTerm() != null) { indexRequest.setIfPrimaryTerm(query.getPrimaryTerm()); }