DATAES-567 - Add aggregation support to reactive client. (#430)

Original PR: #430
This commit is contained in:
amordleq 2020-04-20 12:33:35 -04:00 committed by GitHub
parent 0afa37c8ea
commit c2eec8c74a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 521 additions and 2 deletions

View File

@ -94,6 +94,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger;
@ -101,6 +102,7 @@ import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -128,6 +130,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
* @author Huw Ayling-Miller
* @author Henrique Amaral
* @author Roman Puchkovskiy
* @author Russell Parry
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
@ -401,6 +404,23 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.flatMap(Flux::fromIterable);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest) {
Assert.notNull(headers, "headers must not be null");
Assert.notNull(searchRequest, "searchRequest must not be null");
searchRequest.source().size(0);
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
.map(SearchResponse::getAggregations) //
.flatMap(Flux::fromIterable);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
@ -751,10 +771,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
private static XContentParser createParser(String mediaType, String content) throws IOException {
return XContentType.fromMediaTypeOrFormat(mediaType) //
.xContent() //
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
.createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
}
private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {

View File

@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.client.reactive;
import org.elasticsearch.search.aggregations.Aggregation;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -411,6 +413,45 @@ public interface ReactiveElasticsearchClient {
*/
Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest);
/**
* Execute the given {@link SearchRequest} with aggregations against the {@literal search} API.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Flux} emitting {@link Aggregation} one by one.
* @since 4.0
*/
default Flux<Aggregation> aggregate(Consumer<SearchRequest> consumer) {
Assert.notNull(consumer, "consumer must not be null");
SearchRequest request = new SearchRequest();
consumer.accept(request);
return aggregate(request);
}
/**
* Execute the given {@link SearchRequest} with aggregations against the {@literal search} API.
*
* @param searchRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Flux} emitting {@link Aggregation} one by one.
* @since 4.0
*/
default Flux<Aggregation> aggregate(SearchRequest searchRequest) { return aggregate(HttpHeaders.EMPTY, searchRequest); }
/**
* Execute the given {@link SearchRequest} with aggregations against the {@literal search} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param searchRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Flux} emitting {@link Aggregation} one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest);
/**
* Execute the given {@link SearchRequest} against the {@literal search scroll} API.
*

View File

@ -0,0 +1,163 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.util;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.adjacency.ParsedAdjacencyMatrix;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.ParsedComposite;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilter;
import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilters;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoTileGrid;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.ParsedNested;
import org.elasticsearch.search.aggregations.bucket.nested.ParsedReverseNested;
import org.elasticsearch.search.aggregations.bucket.nested.ReverseNestedAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.DateRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.GeoDistanceAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.IpRangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.range.ParsedBinaryRange;
import org.elasticsearch.search.aggregations.bucket.range.ParsedDateRange;
import org.elasticsearch.search.aggregations.bucket.range.ParsedGeoDistance;
import org.elasticsearch.search.aggregations.bucket.range.ParsedRange;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler;
import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler;
import org.elasticsearch.search.aggregations.bucket.significant.ParsedSignificantLongTerms;
import org.elasticsearch.search.aggregations.bucket.significant.ParsedSignificantStringTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.metrics.*;
import org.elasticsearch.search.aggregations.pipeline.*;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
import org.elasticsearch.search.suggest.phrase.PhraseSuggestion;
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder;
import org.elasticsearch.search.suggest.term.TermSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* <p>
* Original implementation source {@link org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents()} by {@literal Elasticsearch}
* (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0.
* </p>
* Modified for usage with {@link ReactiveElasticsearchClient}.
* <p>
* Only intended for internal use.
*
* @author Russell Parry
* @since 4.0
*/
public class NamedXContents {
private NamedXContents() {
// contains only utility methods
}
public static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
map.put(CardinalityAggregationBuilder.NAME, (p, c) -> ParsedCardinality.fromXContent(p, (String) c));
map.put(InternalHDRPercentiles.NAME, (p, c) -> ParsedHDRPercentiles.fromXContent(p, (String) c));
map.put(InternalHDRPercentileRanks.NAME, (p, c) -> ParsedHDRPercentileRanks.fromXContent(p, (String) c));
map.put(InternalTDigestPercentiles.NAME, (p, c) -> ParsedTDigestPercentiles.fromXContent(p, (String) c));
map.put(InternalTDigestPercentileRanks.NAME, (p, c) -> ParsedTDigestPercentileRanks.fromXContent(p, (String) c));
map.put(PercentilesBucketPipelineAggregationBuilder.NAME, (p, c) -> ParsedPercentilesBucket.fromXContent(p, (String) c));
map.put(MedianAbsoluteDeviationAggregationBuilder.NAME, (p, c) -> ParsedMedianAbsoluteDeviation.fromXContent(p, (String) c));
map.put(MinAggregationBuilder.NAME, (p, c) -> ParsedMin.fromXContent(p, (String) c));
map.put(MaxAggregationBuilder.NAME, (p, c) -> ParsedMax.fromXContent(p, (String) c));
map.put(SumAggregationBuilder.NAME, (p, c) -> ParsedSum.fromXContent(p, (String) c));
map.put(AvgAggregationBuilder.NAME, (p, c) -> ParsedAvg.fromXContent(p, (String) c));
map.put(WeightedAvgAggregationBuilder.NAME, (p, c) -> ParsedWeightedAvg.fromXContent(p, (String) c));
map.put(ValueCountAggregationBuilder.NAME, (p, c) -> ParsedValueCount.fromXContent(p, (String) c));
map.put(InternalSimpleValue.NAME, (p, c) -> ParsedSimpleValue.fromXContent(p, (String) c));
map.put(DerivativePipelineAggregationBuilder.NAME, (p, c) -> ParsedDerivative.fromXContent(p, (String) c));
map.put(InternalBucketMetricValue.NAME, (p, c) -> ParsedBucketMetricValue.fromXContent(p, (String) c));
map.put(StatsAggregationBuilder.NAME, (p, c) -> ParsedStats.fromXContent(p, (String) c));
map.put(StatsBucketPipelineAggregationBuilder.NAME, (p, c) -> ParsedStatsBucket.fromXContent(p, (String) c));
map.put(ExtendedStatsAggregationBuilder.NAME, (p, c) -> ParsedExtendedStats.fromXContent(p, (String) c));
map.put(ExtendedStatsBucketPipelineAggregationBuilder.NAME,
(p, c) -> ParsedExtendedStatsBucket.fromXContent(p, (String) c));
map.put(GeoBoundsAggregationBuilder.NAME, (p, c) -> ParsedGeoBounds.fromXContent(p, (String) c));
map.put(GeoCentroidAggregationBuilder.NAME, (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c));
map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));
map.put(MissingAggregationBuilder.NAME, (p, c) -> ParsedMissing.fromXContent(p, (String) c));
map.put(NestedAggregationBuilder.NAME, (p, c) -> ParsedNested.fromXContent(p, (String) c));
map.put(ReverseNestedAggregationBuilder.NAME, (p, c) -> ParsedReverseNested.fromXContent(p, (String) c));
map.put(GlobalAggregationBuilder.NAME, (p, c) -> ParsedGlobal.fromXContent(p, (String) c));
map.put(FilterAggregationBuilder.NAME, (p, c) -> ParsedFilter.fromXContent(p, (String) c));
map.put(InternalSampler.PARSER_NAME, (p, c) -> ParsedSampler.fromXContent(p, (String) c));
map.put(GeoHashGridAggregationBuilder.NAME, (p, c) -> ParsedGeoHashGrid.fromXContent(p, (String) c));
map.put(GeoTileGridAggregationBuilder.NAME, (p, c) -> ParsedGeoTileGrid.fromXContent(p, (String) c));
map.put(RangeAggregationBuilder.NAME, (p, c) -> ParsedRange.fromXContent(p, (String) c));
map.put(DateRangeAggregationBuilder.NAME, (p, c) -> ParsedDateRange.fromXContent(p, (String) c));
map.put(GeoDistanceAggregationBuilder.NAME, (p, c) -> ParsedGeoDistance.fromXContent(p, (String) c));
map.put(FiltersAggregationBuilder.NAME, (p, c) -> ParsedFilters.fromXContent(p, (String) c));
map.put(AdjacencyMatrixAggregationBuilder.NAME, (p, c) -> ParsedAdjacencyMatrix.fromXContent(p, (String) c));
map.put(SignificantLongTerms.NAME, (p, c) -> ParsedSignificantLongTerms.fromXContent(p, (String) c));
map.put(SignificantStringTerms.NAME, (p, c) -> ParsedSignificantStringTerms.fromXContent(p, (String) c));
map.put(ScriptedMetricAggregationBuilder.NAME, (p, c) -> ParsedScriptedMetric.fromXContent(p, (String) c));
map.put(IpRangeAggregationBuilder.NAME, (p, c) -> ParsedBinaryRange.fromXContent(p, (String) c));
map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
map.put(CompositeAggregationBuilder.NAME, (p, c) -> ParsedComposite.fromXContent(p, (String) c));
List<NamedXContentRegistry.Entry> entries = map.entrySet().stream()
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
.collect(Collectors.toList());
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(TermSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> TermSuggestion.fromXContent(parser, (String)context)));
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(PhraseSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> PhraseSuggestion.fromXContent(parser, (String)context)));
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(CompletionSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> CompletionSuggestion.fromXContent(parser, (String)context)));
return entries;
}
}

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import org.elasticsearch.search.aggregations.Aggregation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -102,6 +103,7 @@ import org.springframework.util.Assert;
* @author Mathias Teier
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @author Russell Parry
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
@ -608,6 +610,27 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
});
}
@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
}
@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return doAggregate(query, entityType, index);
}
private Flux<Aggregation> doAggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return Flux.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, entityType, index);
request = prepareSearchRequest(request);
request.source().size(0);
request.source().trackTotalHits(false);
return doAggregate(request);
});
}
@Override
public Mono<Long> count(Query query, Class<?> entityType) {
return count(query, entityType, getIndexCoordinatesFor(entityType));
@ -682,6 +705,22 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<Aggregation> doAggregate(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doCount: {}", request);
}
return Flux.from(execute(client -> client.aggregate(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Flux.empty());
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*

View File

@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.search.aggregations.Aggregation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -30,6 +31,7 @@ import org.springframework.data.elasticsearch.core.query.StringQuery;
* APIs</a>.
*
* @author Peter-Josef Meisch
* @author Russell Parry
* @since 4.0
*/
public interface ReactiveSearchOperations {
@ -183,4 +185,25 @@ public interface ReactiveSearchOperations {
default <T> Flux<SearchHit<T>> search(Query query, Class<T> entityType, IndexCoordinates index) {
return search(query, entityType, entityType, index);
}
/**
* Perform an aggregation specified by the given {@link Query query}. <br />
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(Query query, Class<?> entityType);
/**
* Perform an aggregation specified by the given {@link Query query}. <br />
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
}

View File

@ -19,6 +19,10 @@ import static org.assertj.core.api.Assertions.*;
import lombok.SneakyThrows;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import reactor.test.StepVerifier;
import java.io.IOException;
@ -67,6 +71,7 @@ import org.springframework.test.context.ContextConfiguration;
* @author Mark Paluch
* @author Peter-Josef Meisch
* @author Henrique Amaral
* @author Russell Parry
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ElasticsearchRestTemplateConfiguration.class })
@ -656,6 +661,27 @@ public class ReactiveElasticsearchClientTests {
}).verifyComplete();
}
@Test //DATAES-567
public void aggregateReturnsAggregationResults() throws IOException {
syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT);
Map<String, Object> jsonMap = Collections.singletonMap("properties",
Collections.singletonMap("firstname", Collections.singletonMap("type", "keyword")));
syncClient.indices().putMapping(new PutMappingRequest(INDEX_I).source(jsonMap), RequestOptions.DEFAULT);
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
searchSourceBuilder.aggregation(AggregationBuilders.terms("terms").field("firstname"));
SearchRequest request = new SearchRequest(INDEX_I) //
.source(searchSourceBuilder);
client.aggregate(request)
.as(StepVerifier::create)
.expectNextMatches(aggregation -> aggregation.getType().equals(StringTerms.NAME))
.verifyComplete();
}
private AddToIndexOfType addSourceDocument() {
return add(DOC_SOURCE);
}

View File

@ -19,11 +19,15 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
import org.elasticsearch.ElasticsearchStatusException;
@ -53,6 +57,7 @@ import org.springframework.util.StreamUtils;
/**
* @author Christoph Strobl
* @author Henrique Amaral
* @author Russell Parry
*/
public class ReactiveElasticsearchClientUnitTests {
@ -577,6 +582,83 @@ public class ReactiveElasticsearchClientUnitTests {
.verifyComplete();
}
// --> AGGREGATE
@Test // DATAES-567
public void aggregateShouldHitSearchEndpoint() {
hostProvider.when(HOST) //
.receive(Receive::json) //
.body(fromPath("aggregate-ok-no-results"));
client.search(new SearchRequest("twitter")).as(StepVerifier::create).verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
URI uri = hostProvider.when(HOST).captureUri();
assertThat(uri.getRawPath()).isEqualTo("/twitter/_search");
}
@Test // DATAES-567
public void aggregateShouldReturnSingleResultCorrectly() {
hostProvider.when(HOST) //
.receive(Receive::json) //
.body(fromPath("aggregate-ok-single-result"));
client.aggregate(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.consumeNextWith(aggregation -> {
assertThat(aggregation.getName()).isEqualTo("users");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("kimchy").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("elastic").getDocCount()).isEqualTo(1);
}).verifyComplete();
}
@Test // DATAES-567
public void aggregateShouldReturnMultipleResultsCorrectly() {
hostProvider.when(HOST) //
.receive(Receive::json) //
.body(fromPath("aggregate-ok-multiple-results"));
client.aggregate(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.consumeNextWith(aggregation -> {
assertThat(aggregation.getName()).isEqualTo("users");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("kimchy").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("elastic").getDocCount()).isEqualTo(1);
}) //
.consumeNextWith(aggregation -> {
assertThat(aggregation.getName()).isEqualTo("max_post_date");
assertThat(aggregation instanceof ParsedMax);
ParsedMax parsedMax = (ParsedMax) aggregation;
assertThat(Instant.ofEpochMilli((long)parsedMax.getValue())).isEqualTo(Instant.parse("2010-01-15T01:46:38Z"));
}).verifyComplete();
}
@Test // DATAES-567
public void aggregateShouldReturnAggregationWithNoValuesWhenNoResultsFound() {
hostProvider.when(HOST) //
.receive(Receive::json) //
.body(fromPath("aggregate-ok-no-results"));
client.aggregate(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.consumeNextWith(aggregation -> {
assertThat(aggregation.getName()).isEqualTo("users");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(0);
}).verifyComplete();
}
// --> SCROLL
@Test // DATAES-510

View File

@ -24,6 +24,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -78,6 +80,7 @@ import org.springframework.util.StringUtils;
* @author Farid Azaza
* @author Martin Choraine
* @author Aleksei Arsenev
* @author Russell Parry
*/
@SpringIntegrationTest
public class ReactiveElasticsearchTemplateTests {
@ -486,6 +489,41 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-567
public void aggregateShouldReturnAggregations() {
SampleEntity sampleEntity1 = randomEntity("some message");
SampleEntity sampleEntity2 = randomEntity("some message");
SampleEntity sampleEntity3 = randomEntity("other message");
index(sampleEntity1, sampleEntity2, sampleEntity3);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message"))
.build();
template.aggregate(query, SampleEntity.class) //
.as(StepVerifier::create) //
.consumeNextWith(aggregation -> {
assertThat(aggregation.getName()).isEqualTo("messages");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("message").getDocCount()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("some").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("other").getDocCount()).isEqualTo(1);
}).verifyComplete();
}
@Test // DATAES-567
public void aggregateShouldReturnEmptyWhenIndexDoesNotExist() {
template.aggregate(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class,
IndexCoordinates.of("no-such-index")) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-519
public void countShouldReturnZeroWhenIndexDoesNotExist() {

View File

@ -0,0 +1,35 @@
{
"took": 52,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": [ ]
},
"aggregations": {
"sterms#users": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "kimchy",
"doc_count": 2
},
{
"key": "elastic",
"doc_count": 1
}
]
},
"max#max_post_date": {
"value" : 1.263519998E12,
"value_as_string" : "2010-01-15T01:46:38.000Z"
}
}
}

View File

@ -0,0 +1,22 @@
{
"took" : 226,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 0,
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"sterms#users" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [ ]
}
}
}

View File

@ -0,0 +1,31 @@
{
"took": 52,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": [ ]
},
"aggregations": {
"sterms#users": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "kimchy",
"doc_count": 2
},
{
"key": "elastic",
"doc_count": 1
}
]
}
}
}