diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index b32cf13fb..33a842119 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -47,7 +47,6 @@ import javax.net.ssl.SSLContext; import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -79,6 +78,8 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Request; +import org.elasticsearch.client.core.CountRequest; +import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -322,6 +323,17 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .publishNext(); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Mono count(HttpHeaders headers, CountRequest countRequest) { + return sendRequest(countRequest, RequestCreator.count(), CountResponse.class, headers) // + .map(CountResponse::getCount) // + .next(); + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) @@ -566,13 +578,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch // --> - private Flux sendRequest(Req request, - Function converter, Class responseType, HttpHeaders headers) { + private Flux sendRequest(Req request, Function converter, + Class responseType, HttpHeaders headers) { return sendRequest(converter.apply(request), responseType, headers); } - private Flux sendRequest(Request request, Class responseType, - HttpHeaders headers) { + private Flux sendRequest(Request request, Class responseType, HttpHeaders headers) { String logId = ClientLogger.newLogId(); @@ -801,6 +812,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return RequestConverters::flushIndex; } + static Function count() { + return RequestConverters::count; + } + } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index b3340ad2b..34ceee6c5 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -60,6 +61,7 @@ import org.springframework.web.reactive.function.client.WebClient; * * @author Christoph Strobl * @author Mark Paluch + * @author Peter-Josef Meisch * @author Henrique Amaral * @since 3.2 * @see ClientConfiguration @@ -332,6 +334,47 @@ public interface ReactiveElasticsearchClient { */ Mono delete(HttpHeaders headers, DeleteRequest deleteRequest); + /** + * Execute a {@link SearchRequest} against the {@literal count} API. + * + * @param consumer new {@literal null}. + * @see Count API on + * elastic.co + * @return the {@link Mono} emitting the count result. + * @since 4.0 + */ + default Mono count(Consumer consumer) { + + CountRequest countRequest = new CountRequest(); + consumer.accept(countRequest); + return count(countRequest); + } + + /** + * Execute a {@link SearchRequest} against the {@literal count} API. + * + * @param countRequest must not be {@literal null}. + * @see Count API on + * elastic.co + * @return the {@link Mono} emitting the count result. + * @since 4.0 + */ + default Mono count(CountRequest countRequest) { + return count(HttpHeaders.EMPTY, countRequest); + } + + /** + * Execute a {@link SearchRequest} against the {@literal count} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param countRequest must not be {@literal null}. + * @see Count API on + * elastic.co + * @return the {@link Mono} emitting the count result. + * @since 4.0 + */ + Mono count(HttpHeaders headers, CountRequest countRequest); + /** * Execute a {@link SearchRequest} against the {@literal search} API. * diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 0163e01c6..7cf4453f8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -60,6 +60,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.Requests; import org.elasticsearch.client.RethrottleRequest; +import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -98,6 +99,8 @@ import org.springframework.lang.Nullable; *

* Only intended for internal use. * + * @author Christoph Strobl + * @author Peter-Josef Meisch * @since 3.2 */ public class RequestConverters { @@ -386,6 +389,32 @@ public class RequestConverters { return request; } + /** + * Creates a count request. + * + * @param countRequest the search defining the data to be counted + * @return Elasticsearch count request + * @since 4.0 + */ + public static Request count(CountRequest countRequest) { + Request request = new Request(HttpMethod.POST.name(), + endpoint(countRequest.indices(), countRequest.types(), "_count")); + + Params params = new Params(request); + addCountRequestParams(params, countRequest); + + if (countRequest.source() != null) { + request.setEntity(createEntity(countRequest.source(), REQUEST_BODY_CONTENT_TYPE)); + } + return request; + } + + private static void addCountRequestParams(Params params, CountRequest countRequest) { + params.withRouting(countRequest.routing()); + params.withPreference(countRequest.preference()); + params.withIndicesOptions(countRequest.indicesOptions()); + } + private static void addSearchRequestParams(Params params, SearchRequest searchRequest) { params.putParam("typed_keys", "true"); params.withRouting(searchRequest.routing()); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index eabe68f2f..5668d692b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -21,7 +21,6 @@ import lombok.NonNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -36,6 +35,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -247,80 +247,133 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera @Nullable String type) { return Flux.defer(() -> { + SearchRequest request = prepareSearchRequest(buildSearchRequest(query, entity, index, type)); - IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); - SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName)); - request.types(indexTypes(query, indexCoordinates::getTypeName)); - - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(mappedQuery(query, entity)); - searchSourceBuilder.version(entity.hasVersionProperty()); - searchSourceBuilder.trackScores(query.getTrackScores()); - - QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); - if (postFilterQuery != null) { - searchSourceBuilder.postFilter(postFilterQuery); - } - - if (query.getSourceFilter() != null) { - searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); - } - - if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) { - searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder()); - } - - sort(query, entity).forEach(searchSourceBuilder::sort); - - if (query.getMinScore() > 0) { - searchSourceBuilder.minScore(query.getMinScore()); - } - - if (query.getIndicesOptions() != null) { - request.indicesOptions(query.getIndicesOptions()); - } - - if (query.getPreference() != null) { - request.preference(query.getPreference()); - } - - if (query.getSearchType() != null) { - request.searchType(query.getSearchType()); - } - - Pageable pageable = query.getPageable(); - - if (pageable.isPaged()) { - - long offset = pageable.getOffset(); - if (offset > Integer.MAX_VALUE) { - throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE)); - } - - searchSourceBuilder.from((int) offset); - searchSourceBuilder.size(pageable.getPageSize()); - - request.source(searchSourceBuilder); - return doFind(prepareSearchRequest(request)); - + if (query.getPageable().isPaged() || query.isLimiting()) { + return doFind(request); } else { - - request.source(searchSourceBuilder); - return doScan(prepareSearchRequest(request)); + return doScroll(request); } }); } + @Override + public Mono count(Query query, Class entityType, String index, String type) { + return doCount(query, getPersistentEntity(entityType), index, type); + } + + private Mono doCount(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, + @Nullable String type) { + return Mono.defer(() -> { + + CountRequest countRequest = buildCountRequest(query, entity, index, type); + CountRequest request = prepareCountRequest(countRequest); + return doCount(request); + }); + + } + + private CountRequest buildCountRequest(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, + @Nullable String type) { + + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); + CountRequest request = new CountRequest(indices(query, indexCoordinates::getIndexName)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(mappedQuery(query, entity)); + searchSourceBuilder.trackScores(query.getTrackScores()); + + QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); + if (postFilterQuery != null) { + searchSourceBuilder.postFilter(postFilterQuery); + } + + if (query.getSourceFilter() != null) { + searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); + } + + if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) { + searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder()); + } + + sort(query, entity).forEach(searchSourceBuilder::sort); + + if (query.getMinScore() > 0) { + searchSourceBuilder.minScore(query.getMinScore()); + } + + if (query.getIndicesOptions() != null) { + request.indicesOptions(query.getIndicesOptions()); + } + + if (query.getPreference() != null) { + request.preference(query.getPreference()); + } + request.source(searchSourceBuilder); + return request; + } + + private SearchRequest buildSearchRequest(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, + @Nullable String type) { + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); + SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(mappedQuery(query, entity)); + searchSourceBuilder.version(entity.hasVersionProperty()); + searchSourceBuilder.trackScores(query.getTrackScores()); + + QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); + if (postFilterQuery != null) { + searchSourceBuilder.postFilter(postFilterQuery); + } + + if (query.getSourceFilter() != null) { + searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); + } + + if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) { + searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder()); + } + + sort(query, entity).forEach(searchSourceBuilder::sort); + + if (query.getMinScore() > 0) { + searchSourceBuilder.minScore(query.getMinScore()); + } + + if (query.getIndicesOptions() != null) { + request.indicesOptions(query.getIndicesOptions()); + } + + if (query.getPreference() != null) { + request.preference(query.getPreference()); + } + + if (query.getSearchType() != null) { + request.searchType(query.getSearchType()); + } + + Pageable pageable = query.getPageable(); + + if (pageable.isPaged()) { + + long offset = pageable.getOffset(); + if (offset > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE)); + } + + searchSourceBuilder.from((int) offset); + searchSourceBuilder.size(pageable.getPageSize()); + + request.source(searchSourceBuilder); + } else { + request.source(searchSourceBuilder); + } + return request; + } /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#count(Query, Class, String, String) */ - @Override - public Mono count(Query query, Class entityType, String index, String type) { - - // TODO: ES 7.0 has a dedicated CountRequest - use that one once available. - return find(query, entityType, index, type).count(); - } /* * (non-Javadoc) @@ -457,6 +510,22 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return prepareWriteRequest(request); } + /** + * Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the + * {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable. + * + * @param request the generated {@link CountRequest}. + * @return never {@literal null}. + */ + protected CountRequest prepareCountRequest(CountRequest request) { + + if (indicesOptions == null) { + return request; + } + + return request.indicesOptions(indicesOptions); + } + /** * Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the * {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable. @@ -557,16 +626,32 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera .onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); } + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link CountRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ + protected Mono doCount(CountRequest request) { + + if (QUERY_LOGGER.isDebugEnabled()) { + QUERY_LOGGER.debug("Executing doCount: {}", request); + } + + return Mono.from(execute(client -> client.count(request))) // + .onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); + } + /** * Customization hook on the actual execution result {@link Publisher}.
* * @param request the already prepared {@link SearchRequest} ready to be executed. * @return a {@link Flux} emitting the result of the operation. */ - protected Flux doScan(SearchRequest request) { + protected Flux doScroll(SearchRequest request) { if (QUERY_LOGGER.isDebugEnabled()) { - QUERY_LOGGER.debug("Executing doScan: {}", request); + QUERY_LOGGER.debug("Executing doScroll: {}", request); } return Flux.from(execute(client -> client.scroll(request))) // @@ -665,9 +750,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera elasticsearchQuery = new WrapperQueryBuilder(((StringQuery) query).getSource()); } else if (query instanceof NativeSearchQuery) { elasticsearchQuery = ((NativeSearchQuery) query).getQuery(); - } - - else { + } else { throw new IllegalArgumentException(String.format("Unknown query type '%s'.", query.getClass())); }