DATAES-680 - ReactiveElasticsearchTemplate-should-use-the-count-API.

Original PR: #341
This commit is contained in:
Peter-Josef Meisch 2019-11-06 19:14:47 +01:00 committed by GitHub
parent 24751972a8
commit 62385edaa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 247 additions and 76 deletions

View File

@ -47,7 +47,6 @@ import javax.net.ssl.SSLContext;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -79,6 +78,8 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -328,6 +329,17 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.publishNext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Mono<Long> count(HttpHeaders headers, CountRequest countRequest) {
return sendRequest(countRequest, RequestCreator.count(), CountResponse.class, headers) //
.map(CountResponse::getCount) //
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
@ -572,13 +584,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
// -->
private <Req extends ActionRequest, Resp extends ActionResponse> Flux<Resp> sendRequest(Req request,
Function<Req, Request> converter, Class<Resp> responseType, HttpHeaders headers) {
private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request, Function<Req, Request> converter,
Class<Resp> responseType, HttpHeaders headers) {
return sendRequest(converter.apply(request), responseType, headers);
}
private <AR extends ActionResponse> Flux<AR> sendRequest(Request request, Class<AR> responseType,
HttpHeaders headers) {
private <Resp> Flux<Resp> sendRequest(Request request, Class<Resp> responseType, HttpHeaders headers) {
String logId = ClientLogger.newLogId();
@ -807,6 +818,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return RequestConverters::flushIndex;
}
static Function<CountRequest, Request> count() {
return RequestConverters::count;
}
}
/**

View File

@ -43,6 +43,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@ -59,6 +60,7 @@ import org.springframework.web.reactive.function.client.WebClient;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Peter-Josef Meisch
* @author Henrique Amaral
* @since 3.2
* @see ClientConfiguration
@ -331,6 +333,47 @@ public interface ReactiveElasticsearchClient {
*/
Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest);
/**
* Execute a {@link SearchRequest} against the {@literal count} API.
*
* @param consumer new {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html">Count API on
* elastic.co</a>
* @return the {@link Mono} emitting the count result.
* @since 4.0
*/
default Mono<Long> count(Consumer<CountRequest> consumer) {
CountRequest countRequest = new CountRequest();
consumer.accept(countRequest);
return count(countRequest);
}
/**
* Execute a {@link SearchRequest} against the {@literal count} API.
*
* @param countRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html">Count API on
* elastic.co</a>
* @return the {@link Mono} emitting the count result.
* @since 4.0
*/
default Mono<Long> count(CountRequest countRequest) {
return count(HttpHeaders.EMPTY, countRequest);
}
/**
* Execute a {@link SearchRequest} against the {@literal count} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param countRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html">Count API on
* elastic.co</a>
* @return the {@link Mono} emitting the count result.
* @since 4.0
*/
Mono<Long> count(HttpHeaders headers, CountRequest countRequest);
/**
* Execute a {@link SearchRequest} against the {@literal search} API.
*

View File

@ -59,6 +59,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RethrottleRequest;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.indices.AnalyzeRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
@ -388,6 +389,32 @@ public class RequestConverters {
return request;
}
/**
* Creates a count request.
*
* @param countRequest the search defining the data to be counted
* @return Elasticsearch count request
* @since 4.0
*/
public static Request count(CountRequest countRequest) {
Request request = new Request(HttpMethod.POST.name(),
endpoint(countRequest.indices(), countRequest.types(), "_count"));
Params params = new Params(request);
addCountRequestParams(params, countRequest);
if (countRequest.source() != null) {
request.setEntity(createEntity(countRequest.source(), REQUEST_BODY_CONTENT_TYPE));
}
return request;
}
private static void addCountRequestParams(Params params, CountRequest countRequest) {
params.withRouting(countRequest.routing());
params.withPreference(countRequest.preference());
params.withIndicesOptions(countRequest.indicesOptions());
}
private static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam("typed_keys", "true");
params.withRouting(searchRequest.routing());

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -228,85 +229,138 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
@Nullable String type) {
return Flux.defer(() -> {
SearchRequest request = prepareSearchRequest(buildSearchRequest(query, entity, index, type));
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName));
request.types(indexTypes(query, indexCoordinates::getTypeName));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity));
searchSourceBuilder.version(entity.hasVersionProperty());
searchSourceBuilder.trackScores(query.getTrackScores());
QueryBuilder postFilterQuery = mappedFilterQuery(query, entity);
if (postFilterQuery != null) {
searchSourceBuilder.postFilter(postFilterQuery);
}
if (query.getSourceFilter() != null) {
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
}
if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) {
searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder());
}
sort(query, entity).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore());
}
if (query.getIndicesOptions() != null) {
request.indicesOptions(query.getIndicesOptions());
}
if (query.getPreference() != null) {
request.preference(query.getPreference());
}
if (query.getSearchType() != null) {
request.searchType(query.getSearchType());
}
Pageable pageable = query.getPageable();
if (pageable.isPaged()) {
long offset = pageable.getOffset();
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE));
}
searchSourceBuilder.from((int) offset);
searchSourceBuilder.size(pageable.getPageSize());
request.source(searchSourceBuilder);
return doFind(prepareSearchRequest(request));
} else if (query.isLimiting()) {
searchSourceBuilder.from(0);
searchSourceBuilder.size(query.getMaxResults());
request.source(searchSourceBuilder);
return doFind(prepareSearchRequest(request));
if (query.getPageable().isPaged() || query.isLimiting()) {
return doFind(request);
} else {
request.source(searchSourceBuilder);
return doScan(prepareSearchRequest(request));
return doScroll(request);
}
});
}
@Override
public Mono<Long> count(Query query, Class<?> entityType, String index, String type) {
return doCount(query, getPersistentEntity(entityType), index, type);
}
private Mono<Long> doCount(Query query, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Mono.defer(() -> {
CountRequest countRequest = buildCountRequest(query, entity, index, type);
CountRequest request = prepareCountRequest(countRequest);
return doCount(request);
});
}
private CountRequest buildCountRequest(Query query, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
CountRequest request = new CountRequest(indices(query, indexCoordinates::getIndexName));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity));
searchSourceBuilder.trackScores(query.getTrackScores());
QueryBuilder postFilterQuery = mappedFilterQuery(query, entity);
if (postFilterQuery != null) {
searchSourceBuilder.postFilter(postFilterQuery);
}
if (query.getSourceFilter() != null) {
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
}
if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) {
searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder());
}
sort(query, entity).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore());
}
if (query.getIndicesOptions() != null) {
request.indicesOptions(query.getIndicesOptions());
}
if (query.getPreference() != null) {
request.preference(query.getPreference());
}
request.source(searchSourceBuilder);
return request;
}
private SearchRequest buildSearchRequest(Query query, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity));
searchSourceBuilder.version(entity.hasVersionProperty());
searchSourceBuilder.trackScores(query.getTrackScores());
QueryBuilder postFilterQuery = mappedFilterQuery(query, entity);
if (postFilterQuery != null) {
searchSourceBuilder.postFilter(postFilterQuery);
}
if (query.getSourceFilter() != null) {
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
}
if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) {
searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder());
}
sort(query, entity).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore());
}
if (query.getIndicesOptions() != null) {
request.indicesOptions(query.getIndicesOptions());
}
if (query.getPreference() != null) {
request.preference(query.getPreference());
}
if (query.getSearchType() != null) {
request.searchType(query.getSearchType());
}
Pageable pageable = query.getPageable();
if (pageable.isPaged()) {
long offset = pageable.getOffset();
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE));
}
searchSourceBuilder.from((int) offset);
searchSourceBuilder.size(pageable.getPageSize());
request.source(searchSourceBuilder);
} else if (query.isLimiting()) {
searchSourceBuilder.from(0);
searchSourceBuilder.size(query.getMaxResults());
request.source(searchSourceBuilder);
} else {
request.source(searchSourceBuilder);
}
return request;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#count(Query, Class, String, String)
*/
@Override
public Mono<Long> count(Query query, Class<?> entityType, String index, String type) {
// TODO: ES 7.0 has a dedicated CountRequest - use that one once available.
return find(query, entityType, index, type).count();
}
/*
* (non-Javadoc)
@ -443,6 +497,22 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return prepareWriteRequest(request);
}
/**
* Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the
* {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable.
*
* @param request the generated {@link CountRequest}.
* @return never {@literal null}.
*/
protected CountRequest prepareCountRequest(CountRequest request) {
if (indicesOptions == null) {
return request;
}
return request.indicesOptions(indicesOptions);
}
/**
* Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the
* {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable.
@ -543,16 +613,32 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link CountRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation.
*/
protected Mono<Long> doCount(CountRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doCount: {}", request);
}
return Mono.from(execute(client -> client.count(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<SearchHit> doScan(SearchRequest request) {
protected Flux<SearchHit> doScroll(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doScan: {}", request);
QUERY_LOGGER.debug("Executing doScroll: {}", request);
}
return Flux.from(execute(client -> client.scroll(request))) //