DATAES-567 - Polishing.

This commit is contained in:
Peter-Josef Meisch 2020-04-20 18:56:11 +02:00
parent c2eec8c74a
commit 91f442bd2f
8 changed files with 64 additions and 60 deletions

View File

@ -410,12 +410,13 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
*/
@Override
public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest) {
Assert.notNull(headers, "headers must not be null");
Assert.notNull(searchRequest, "searchRequest must not be null");
searchRequest.source().size(0);
searchRequest.source().trackTotalHits(false);
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
.map(SearchResponse::getAggregations) //
.flatMap(Flux::fromIterable);
@ -773,7 +774,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
private static XContentParser createParser(String mediaType, String content) throws IOException {
return XContentType.fromMediaTypeOrFormat(mediaType) //
.xContent() //
.createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
.createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
}
private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {

View File

@ -15,8 +15,6 @@
*/
package org.springframework.data.elasticsearch.client.reactive;
import org.elasticsearch.search.aggregations.Aggregation;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -49,9 +47,11 @@ import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
@ -416,14 +416,17 @@ public interface ReactiveElasticsearchClient {
/**
* Execute the given {@link SearchRequest} with aggregations against the {@literal search} API.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @param consumer
* never {@literal null}.
* @return the {@link Flux} emitting {@link Aggregation} one by one.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @since 4.0
*/
default Flux<Aggregation> aggregate(Consumer<SearchRequest> consumer) {
Assert.notNull(consumer, "consumer must not be null");
SearchRequest request = new SearchRequest();
consumer.accept(request);
return aggregate(request);
@ -438,7 +441,9 @@ public interface ReactiveElasticsearchClient {
* @return the {@link Flux} emitting {@link Aggregation} one by one.
* @since 4.0
*/
default Flux<Aggregation> aggregate(SearchRequest searchRequest) { return aggregate(HttpHeaders.EMPTY, searchRequest); }
default Flux<Aggregation> aggregate(SearchRequest searchRequest) {
return aggregate(HttpHeaders.EMPTY, searchRequest);
}
/**
* Execute the given {@link SearchRequest} with aggregations against the {@literal search} API.

View File

@ -15,6 +15,11 @@
*/
package org.springframework.data.elasticsearch.client.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -76,15 +81,11 @@ import org.elasticsearch.search.suggest.term.TermSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* <p>
* Original implementation source {@link org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents()} by {@literal Elasticsearch}
* (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0.
* Original implementation source {@link org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents()} by
* {@literal Elasticsearch} (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache
* License, Version 2.0.
* </p>
* Modified for usage with {@link ReactiveElasticsearchClient}.
* <p>
@ -106,8 +107,10 @@ public class NamedXContents {
map.put(InternalHDRPercentileRanks.NAME, (p, c) -> ParsedHDRPercentileRanks.fromXContent(p, (String) c));
map.put(InternalTDigestPercentiles.NAME, (p, c) -> ParsedTDigestPercentiles.fromXContent(p, (String) c));
map.put(InternalTDigestPercentileRanks.NAME, (p, c) -> ParsedTDigestPercentileRanks.fromXContent(p, (String) c));
map.put(PercentilesBucketPipelineAggregationBuilder.NAME, (p, c) -> ParsedPercentilesBucket.fromXContent(p, (String) c));
map.put(MedianAbsoluteDeviationAggregationBuilder.NAME, (p, c) -> ParsedMedianAbsoluteDeviation.fromXContent(p, (String) c));
map.put(PercentilesBucketPipelineAggregationBuilder.NAME,
(p, c) -> ParsedPercentilesBucket.fromXContent(p, (String) c));
map.put(MedianAbsoluteDeviationAggregationBuilder.NAME,
(p, c) -> ParsedMedianAbsoluteDeviation.fromXContent(p, (String) c));
map.put(MinAggregationBuilder.NAME, (p, c) -> ParsedMin.fromXContent(p, (String) c));
map.put(MaxAggregationBuilder.NAME, (p, c) -> ParsedMax.fromXContent(p, (String) c));
map.put(SumAggregationBuilder.NAME, (p, c) -> ParsedSum.fromXContent(p, (String) c));
@ -149,15 +152,18 @@ public class NamedXContents {
map.put(IpRangeAggregationBuilder.NAME, (p, c) -> ParsedBinaryRange.fromXContent(p, (String) c));
map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
map.put(CompositeAggregationBuilder.NAME, (p, c) -> ParsedComposite.fromXContent(p, (String) c));
List<NamedXContentRegistry.Entry> entries = map.entrySet().stream()
.map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
List<NamedXContentRegistry.Entry> entries = map.entrySet().stream().map(
entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
.collect(Collectors.toList());
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(TermSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> TermSuggestion.fromXContent(parser, (String)context)));
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(PhraseSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> PhraseSuggestion.fromXContent(parser, (String)context)));
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(CompletionSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> CompletionSuggestion.fromXContent(parser, (String)context)));
entries.add(
new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(TermSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> TermSuggestion.fromXContent(parser, (String) context)));
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class,
new ParseField(PhraseSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> PhraseSuggestion.fromXContent(parser, (String) context)));
entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class,
new ParseField(CompletionSuggestionBuilder.SUGGESTION_NAME),
(parser, context) -> CompletionSuggestion.fromXContent(parser, (String) context)));
return entries;
}
}

View File

@ -17,7 +17,6 @@ package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import org.elasticsearch.search.aggregations.Aggregation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -50,6 +49,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.WrapperQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
@ -624,9 +624,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return Flux.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, entityType, index);
request = prepareSearchRequest(request);
request.source().size(0);
request.source().trackTotalHits(false);
return doAggregate(request);
});
}

View File

@ -15,11 +15,11 @@
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.search.aggregations.Aggregation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.Query;

View File

@ -18,11 +18,6 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*;
import lombok.SneakyThrows;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import reactor.test.StepVerifier;
import java.io.IOException;
@ -47,12 +42,16 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -218,9 +217,9 @@ public class ReactiveElasticsearchClientTests {
String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
MultiGetRequest request = new MultiGetRequest() //
.add(INDEX_I,id1) //
.add(INDEX_I,"this-one-does-not-exist") //
.add(INDEX_I,id2); //
.add(INDEX_I, id1) //
.add(INDEX_I, "this-one-does-not-exist") //
.add(INDEX_I, id2); //
client.multiGet(request) //
.map(GetResult::getId) //
@ -236,8 +235,7 @@ public class ReactiveElasticsearchClientTests {
String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
client.multiGet(new MultiGetRequest() //
.add(INDEX_II, id1)
.add(INDEX_II, id2)) //
.add(INDEX_II, id1).add(INDEX_II, id2)) //
.as(StepVerifier::create) //
.verifyComplete();
}
@ -661,7 +659,7 @@ public class ReactiveElasticsearchClientTests {
}).verifyComplete();
}
@Test //DATAES-567
@Test // DATAES-567
public void aggregateReturnsAggregationResults() throws IOException {
syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT);
Map<String, Object> jsonMap = Collections.singletonMap("properties",
@ -676,10 +674,8 @@ public class ReactiveElasticsearchClientTests {
SearchRequest request = new SearchRequest(INDEX_I) //
.source(searchSourceBuilder);
client.aggregate(request)
.as(StepVerifier::create)
.expectNextMatches(aggregation -> aggregation.getType().equals(StringTerms.NAME))
.verifyComplete();
client.aggregate(request).as(StepVerifier::create)
.expectNextMatches(aggregation -> aggregation.getType().equals(StringTerms.NAME)).verifyComplete();
}
private AddToIndexOfType addSourceDocument() {

View File

@ -19,9 +19,6 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -43,6 +40,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.metrics.ParsedMax;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@ -637,7 +636,8 @@ public class ReactiveElasticsearchClientUnitTests {
assertThat(aggregation.getName()).isEqualTo("max_post_date");
assertThat(aggregation instanceof ParsedMax);
ParsedMax parsedMax = (ParsedMax) aggregation;
assertThat(Instant.ofEpochMilli((long)parsedMax.getValue())).isEqualTo(Instant.parse("2010-01-15T01:46:38Z"));
assertThat(Instant.ofEpochMilli((long) parsedMax.getValue()))
.isEqualTo(Instant.parse("2010-01-15T01:46:38Z"));
}).verifyComplete();
}
@ -648,7 +648,6 @@ public class ReactiveElasticsearchClientUnitTests {
.receive(Receive::json) //
.body(fromPath("aggregate-ok-no-results"));
client.aggregate(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.consumeNextWith(aggregation -> {

View File

@ -24,8 +24,6 @@ import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -41,6 +39,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.AfterEach;
@ -498,10 +498,8 @@ public class ReactiveElasticsearchTemplateTests {
index(sampleEntity1, sampleEntity2, sampleEntity3);
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message"))
.build();
NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message")).build();
template.aggregate(query, SampleEntity.class) //
.as(StepVerifier::create) //
@ -518,8 +516,9 @@ public class ReactiveElasticsearchTemplateTests {
@Test // DATAES-567
public void aggregateShouldReturnEmptyWhenIndexDoesNotExist() {
template.aggregate(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class,
IndexCoordinates.of("no-such-index")) //
template
.aggregate(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class,
IndexCoordinates.of("no-such-index")) //
.as(StepVerifier::create) //
.verifyComplete();
}