diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 71936494c..76d4561c6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -410,12 +410,13 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch */ @Override public Flux aggregate(HttpHeaders headers, SearchRequest searchRequest) { - + Assert.notNull(headers, "headers must not be null"); Assert.notNull(searchRequest, "searchRequest must not be null"); - + searchRequest.source().size(0); - + searchRequest.source().trackTotalHits(false); + return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) // .map(SearchResponse::getAggregations) // .flatMap(Flux::fromIterable); @@ -773,7 +774,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch private static XContentParser createParser(String mediaType, String content) throws IOException { return XContentType.fromMediaTypeOrFormat(mediaType) // .xContent() // - .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); + .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); } private static Publisher handleServerError(Request request, ClientResponse response) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 708fba9df..89602ef1f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -15,8 +15,6 @@ */ package org.springframework.data.elasticsearch.client.reactive; -import org.elasticsearch.search.aggregations.Aggregation; -import org.springframework.util.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,9 +47,11 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregation; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.http.HttpHeaders; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; @@ -416,14 +416,17 @@ public interface ReactiveElasticsearchClient { /** * Execute the given {@link SearchRequest} with aggregations against the {@literal search} API. * - * @param consumer never {@literal null}. - * @see Search API on - * elastic.co + * @param consumer + * never {@literal null}. * @return the {@link Flux} emitting {@link Aggregation} one by one. + * @see Search API on + * elastic.co * @since 4.0 */ default Flux aggregate(Consumer consumer) { + Assert.notNull(consumer, "consumer must not be null"); + SearchRequest request = new SearchRequest(); consumer.accept(request); return aggregate(request); @@ -438,7 +441,9 @@ public interface ReactiveElasticsearchClient { * @return the {@link Flux} emitting {@link Aggregation} one by one. * @since 4.0 */ - default Flux aggregate(SearchRequest searchRequest) { return aggregate(HttpHeaders.EMPTY, searchRequest); } + default Flux aggregate(SearchRequest searchRequest) { + return aggregate(HttpHeaders.EMPTY, searchRequest); + } /** * Execute the given {@link SearchRequest} with aggregations against the {@literal search} API. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/NamedXContents.java b/src/main/java/org/springframework/data/elasticsearch/client/util/NamedXContents.java index edf5f1f41..482a3b76e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/NamedXContents.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/NamedXContents.java @@ -15,6 +15,11 @@ */ package org.springframework.data.elasticsearch.client.util; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -76,15 +81,11 @@ import org.elasticsearch.search.suggest.term.TermSuggestion; import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - /** *

- * Original implementation source {@link org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents()} by {@literal Elasticsearch} - * (https://www.elastic.co) licensed under the Apache License, Version 2.0. + * Original implementation source {@link org.elasticsearch.client.RestHighLevelClient#getDefaultNamedXContents()} by + * {@literal Elasticsearch} (https://www.elastic.co) licensed under the Apache + * License, Version 2.0. *

* Modified for usage with {@link ReactiveElasticsearchClient}. *

@@ -106,8 +107,10 @@ public class NamedXContents { map.put(InternalHDRPercentileRanks.NAME, (p, c) -> ParsedHDRPercentileRanks.fromXContent(p, (String) c)); map.put(InternalTDigestPercentiles.NAME, (p, c) -> ParsedTDigestPercentiles.fromXContent(p, (String) c)); map.put(InternalTDigestPercentileRanks.NAME, (p, c) -> ParsedTDigestPercentileRanks.fromXContent(p, (String) c)); - map.put(PercentilesBucketPipelineAggregationBuilder.NAME, (p, c) -> ParsedPercentilesBucket.fromXContent(p, (String) c)); - map.put(MedianAbsoluteDeviationAggregationBuilder.NAME, (p, c) -> ParsedMedianAbsoluteDeviation.fromXContent(p, (String) c)); + map.put(PercentilesBucketPipelineAggregationBuilder.NAME, + (p, c) -> ParsedPercentilesBucket.fromXContent(p, (String) c)); + map.put(MedianAbsoluteDeviationAggregationBuilder.NAME, + (p, c) -> ParsedMedianAbsoluteDeviation.fromXContent(p, (String) c)); map.put(MinAggregationBuilder.NAME, (p, c) -> ParsedMin.fromXContent(p, (String) c)); map.put(MaxAggregationBuilder.NAME, (p, c) -> ParsedMax.fromXContent(p, (String) c)); map.put(SumAggregationBuilder.NAME, (p, c) -> ParsedSum.fromXContent(p, (String) c)); @@ -149,15 +152,18 @@ public class NamedXContents { map.put(IpRangeAggregationBuilder.NAME, (p, c) -> ParsedBinaryRange.fromXContent(p, (String) c)); map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c)); map.put(CompositeAggregationBuilder.NAME, (p, c) -> ParsedComposite.fromXContent(p, (String) c)); - List entries = map.entrySet().stream() - .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue())) + List entries = map.entrySet().stream().map( + entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue())) .collect(Collectors.toList()); - entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(TermSuggestionBuilder.SUGGESTION_NAME), - (parser, context) -> TermSuggestion.fromXContent(parser, (String)context))); - entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(PhraseSuggestionBuilder.SUGGESTION_NAME), - (parser, context) -> PhraseSuggestion.fromXContent(parser, (String)context))); - entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(CompletionSuggestionBuilder.SUGGESTION_NAME), - (parser, context) -> CompletionSuggestion.fromXContent(parser, (String)context))); + entries.add( + new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField(TermSuggestionBuilder.SUGGESTION_NAME), + (parser, context) -> TermSuggestion.fromXContent(parser, (String) context))); + entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, + new ParseField(PhraseSuggestionBuilder.SUGGESTION_NAME), + (parser, context) -> PhraseSuggestion.fromXContent(parser, (String) context))); + entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, + new ParseField(CompletionSuggestionBuilder.SUGGESTION_NAME), + (parser, context) -> CompletionSuggestion.fromXContent(parser, (String) context))); return entries; } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index a19090abc..c9f006b7c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -17,7 +17,6 @@ package org.springframework.data.elasticsearch.core; import static org.elasticsearch.index.VersionType.*; -import org.elasticsearch.search.aggregations.Aggregation; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -50,6 +49,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.WrapperQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -624,9 +624,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return Flux.defer(() -> { SearchRequest request = requestFactory.searchRequest(query, entityType, index); request = prepareSearchRequest(request); - request.source().size(0); - request.source().trackTotalHits(false); - return doAggregate(request); }); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java index 32e3e4edc..47eada0de 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java @@ -15,11 +15,11 @@ */ package org.springframework.data.elasticsearch.core; -import org.elasticsearch.search.aggregations.Aggregation; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.Aggregation; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.Query; diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index fb13bb6bc..c737ec6b9 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -18,11 +18,6 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; import lombok.SneakyThrows; -import org.elasticsearch.client.indices.GetIndexRequest; -import org.elasticsearch.client.indices.PutMappingRequest; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import reactor.test.StepVerifier; import java.io.IOException; @@ -47,12 +42,16 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -218,9 +217,9 @@ public class ReactiveElasticsearchClientTests { String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I); MultiGetRequest request = new MultiGetRequest() // - .add(INDEX_I,id1) // - .add(INDEX_I,"this-one-does-not-exist") // - .add(INDEX_I,id2); // + .add(INDEX_I, id1) // + .add(INDEX_I, "this-one-does-not-exist") // + .add(INDEX_I, id2); // client.multiGet(request) // .map(GetResult::getId) // @@ -236,8 +235,7 @@ public class ReactiveElasticsearchClientTests { String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I); client.multiGet(new MultiGetRequest() // - .add(INDEX_II, id1) - .add(INDEX_II, id2)) // + .add(INDEX_II, id1).add(INDEX_II, id2)) // .as(StepVerifier::create) // .verifyComplete(); } @@ -661,7 +659,7 @@ public class ReactiveElasticsearchClientTests { }).verifyComplete(); } - @Test //DATAES-567 + @Test // DATAES-567 public void aggregateReturnsAggregationResults() throws IOException { syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); Map jsonMap = Collections.singletonMap("properties", @@ -676,10 +674,8 @@ public class ReactiveElasticsearchClientTests { SearchRequest request = new SearchRequest(INDEX_I) // .source(searchSourceBuilder); - client.aggregate(request) - .as(StepVerifier::create) - .expectNextMatches(aggregation -> aggregation.getType().equals(StringTerms.NAME)) - .verifyComplete(); + client.aggregate(request).as(StepVerifier::create) + .expectNextMatches(aggregation -> aggregation.getType().equals(StringTerms.NAME)).verifyComplete(); } private AddToIndexOfType addSourceDocument() { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java index feb86a9a6..25b0bdea4 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java @@ -19,9 +19,6 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; -import org.elasticsearch.search.aggregations.metrics.ParsedMax; -import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -43,6 +40,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; +import org.elasticsearch.search.aggregations.metrics.ParsedMax; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -637,7 +636,8 @@ public class ReactiveElasticsearchClientUnitTests { assertThat(aggregation.getName()).isEqualTo("max_post_date"); assertThat(aggregation instanceof ParsedMax); ParsedMax parsedMax = (ParsedMax) aggregation; - assertThat(Instant.ofEpochMilli((long)parsedMax.getValue())).isEqualTo(Instant.parse("2010-01-15T01:46:38Z")); + assertThat(Instant.ofEpochMilli((long) parsedMax.getValue())) + .isEqualTo(Instant.parse("2010-01-15T01:46:38Z")); }).verifyComplete(); } @@ -648,7 +648,6 @@ public class ReactiveElasticsearchClientUnitTests { .receive(Receive::json) // .body(fromPath("aggregate-ok-no-results")); - client.aggregate(new SearchRequest("twitter")) // .as(StepVerifier::create) // .consumeNextWith(aggregation -> { diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index f39cbca3f..90d9833d8 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -24,8 +24,6 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -41,6 +39,8 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.junit.jupiter.api.AfterEach; @@ -498,10 +498,8 @@ public class ReactiveElasticsearchTemplateTests { index(sampleEntity1, sampleEntity2, sampleEntity3); - NativeSearchQuery query = new NativeSearchQueryBuilder() - .withQuery(matchAllQuery()) - .addAggregation(AggregationBuilders.terms("messages").field("message")) - .build(); + NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) + .addAggregation(AggregationBuilders.terms("messages").field("message")).build(); template.aggregate(query, SampleEntity.class) // .as(StepVerifier::create) // @@ -518,8 +516,9 @@ public class ReactiveElasticsearchTemplateTests { @Test // DATAES-567 public void aggregateShouldReturnEmptyWhenIndexDoesNotExist() { - template.aggregate(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class, - IndexCoordinates.of("no-such-index")) // + template + .aggregate(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class, + IndexCoordinates.of("no-such-index")) // .as(StepVerifier::create) // .verifyComplete(); }