Encapsulate client specific aggregation return types.

Original Pull Request #1921
Closes #1920
This commit is contained in:
Peter-Josef Meisch 2021-09-04 13:20:01 +02:00 committed by GitHub
parent e71758686c
commit 64624aec70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 218 additions and 48 deletions

View File

@ -30,6 +30,12 @@ Check the sections on <<elasticsearch-migration-guide-4.2-4.3.deprecations>> and
* In the `org.springframework.data.elasticsearch.annotations.Document` annotation the `versionType()` property has changed to `org.springframework.data.elasticsearch.annotations.Document.VersionType`, the available enum values are the same.
* In the `org.springframework.data.elasticsearch.core.query.Query` interface the `searchType()` property has changed to `org.springframework.data.elasticsearch.core.query.Query.SearchType`, the available enum values are the same.
* In the `org.springframework.data.elasticsearch.core.query.Query` interface the return value of `timeout()` was changed to `java.time.Duration`.
* The `SearchHits<T>`class does not contain the `org.elasticsearch.search.aggregations.Aggregations` anymore.
Instead it now contains an instance of the `org.springframework.data.elasticsearch.core.AggregationsContainer<T>` class where `T` is the concrete aggregations type from the underlying client that is used.
Currently this will be a `org
.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations` object; later different implementations will be available.
The same change has been done to the `ReactiveSearchOperations.aggregate()` functions, the now return a `Flux<AggregationContainer<?>>`.
Programs using the aggregations need to be changed to cast the returned value to the appropriate class to further proces it.
=== Handling of field and sourceFilter properties of Query

View File

@ -0,0 +1,31 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
/**
* Aggregation container used in the Spring Data Elasticsearch API for a single aggregation. The concrete
* implementations must be provided by the code handling the direct communication with Elasticsearch.
*
* @author Peter-Josef Meisch
* @param <T> the aggregation class from the used client implementation.
* @since 4.3
*/
public interface AggregationContainer<T> {
/**
* @return the concrete aggregations implementation
*/
T aggregation();
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
/**
* Aggregations container used in the Spring Data Elasticsearch API. The concrete implementations must be provided by
* the code handling the direct communication with Elasticsearch.
*
* @author Peter-Josef Meisch
* @param <T> the aggregations class from the used client implementation.
* @since 4.3
*/
public interface AggregationsContainer<T> {
/**
* @return the concrete aggregations implementation
*/
T aggregations();
}

View File

@ -44,7 +44,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.reactivestreams.Publisher;
@ -59,6 +58,7 @@ import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.core.cluster.DefaultReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
@ -782,12 +782,12 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType) {
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
}
@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return doAggregate(query, entityType, index);
}
@ -808,7 +808,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
});
}
private Flux<Aggregation> doAggregate(Query query, Class<?> entityType, IndexCoordinates index) {
private Flux<AggregationContainer<?>> doAggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return Flux.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, entityType, index);
request = prepareSearchRequest(request, false);
@ -872,14 +872,14 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<Aggregation> doAggregate(SearchRequest request) {
protected Flux<AggregationContainer<?>> doAggregate(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doCount: {}", request);
}
return Flux.from(execute(client -> client.aggregate(request))) //
.onErrorResume(NoSuchIndexException.class, it -> Flux.empty());
.onErrorResume(NoSuchIndexException.class, it -> Flux.empty()).map(ElasticsearchAggregation::new);
}
/**

View File

@ -20,7 +20,6 @@ import reactor.core.publisher.Mono;
import java.util.List;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.springframework.data.domain.Pageable;
@ -185,7 +184,7 @@ public interface ReactiveSearchOperations {
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(Query query, Class<?> entityType);
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType);
/**
* Perform an aggregation specified by the given {@link Query query}. <br />
@ -196,7 +195,7 @@ public interface ReactiveSearchOperations {
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<Aggregation> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
/**
* Does a suggest query

View File

@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.search.aggregations.Aggregations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
@ -95,7 +94,7 @@ class SearchHitMapping<T> {
SearchHit<T> hit = mapHit(document, content);
searchHits.add(hit);
}
Aggregations aggregations = searchDocumentResponse.getAggregations();
AggregationsContainer<?> aggregations = searchDocumentResponse.getAggregations();
TotalHitsRelation totalHitsRelation = TotalHitsRelation.valueOf(searchDocumentResponse.getTotalHitsRelation());
return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, searchHits, aggregations);
@ -225,7 +224,7 @@ class SearchHitMapping<T> {
/**
* find a {@link ElasticsearchPersistentEntity} following the property chain defined by the nested metadata
*
*
* @param persistentEntity base entity
* @param nestedMetaData nested metadata
* @return A {@link ElasticsearchPersistentEntityWithNestedMetaData} containing the found entity or null together with

View File

@ -18,13 +18,12 @@ package org.springframework.data.elasticsearch.core;
import java.util.Iterator;
import java.util.List;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;
/**
* Encapsulates a list of {@link SearchHit}s with additional information from the search.
*
*
* @param <T> the result data class.
* @author Sascha Woo
* @since 4.0
@ -35,7 +34,7 @@ public interface SearchHits<T> extends Streamable<SearchHit<T>> {
* @return the aggregations.
*/
@Nullable
Aggregations getAggregations();
AggregationsContainer<?> getAggregations();
/**
* @return the maximum score

View File

@ -18,7 +18,6 @@ package org.springframework.data.elasticsearch.core;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.util.Lazy;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -39,7 +38,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
@Nullable private final String scrollId;
private final List<? extends SearchHit<T>> searchHits;
private final Lazy<List<SearchHit<T>>> unmodifiableSearchHits;
@Nullable private final Aggregations aggregations;
@Nullable private final AggregationsContainer<?> aggregations;
/**
* @param totalHits the number of total hits for the search
@ -50,7 +49,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
* @param aggregations the aggregations if available
*/
public SearchHitsImpl(long totalHits, TotalHitsRelation totalHitsRelation, float maxScore, @Nullable String scrollId,
List<? extends SearchHit<T>> searchHits, @Nullable Aggregations aggregations) {
List<? extends SearchHit<T>> searchHits, @Nullable AggregationsContainer<?> aggregations) {
Assert.notNull(searchHits, "searchHits must not be null");
@ -113,7 +112,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
// region aggregations
@Override
@Nullable
public Aggregations getAggregations() {
public AggregationsContainer<?> getAggregations() {
return aggregations;
}
// endregion

View File

@ -15,7 +15,6 @@
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
@ -33,7 +32,7 @@ public interface SearchHitsIterator<T> extends CloseableIterator<SearchHit<T>> {
* @return the aggregations.
*/
@Nullable
Aggregations getAggregations();
AggregationsContainer<?> getAggregations();
/**
* @return the maximum score

View File

@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -55,7 +54,7 @@ abstract class StreamQueries {
Assert.notNull(continueScrollFunction, "continueScrollFunction must not be null.");
Assert.notNull(clearScrollConsumer, "clearScrollConsumer must not be null.");
Aggregations aggregations = searchHits.getAggregations();
AggregationsContainer<?> aggregations = searchHits.getAggregations();
float maxScore = searchHits.getMaxScore();
long totalHits = searchHits.getTotalHits();
TotalHitsRelation totalHitsRelation = searchHits.getTotalHitsRelation();
@ -78,7 +77,7 @@ abstract class StreamQueries {
@Override
@Nullable
public Aggregations getAggregations() {
public AggregationsContainer<?> getAggregations() {
return aggregations;
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.clients.elasticsearch7;
import org.elasticsearch.search.aggregations.Aggregation;
import org.springframework.data.elasticsearch.core.AggregationContainer;
import org.springframework.lang.NonNull;
/**
* AggregationContainer implementation for an Elasticsearch7 aggregation.
*
* @author Peter-Josef Meisch
* @since 4.3
*/
public class ElasticsearchAggregation implements AggregationContainer<Aggregation> {
private final Aggregation aggregation;
public ElasticsearchAggregation(Aggregation aggregation) {
this.aggregation = aggregation;
}
@NonNull
@Override
public Aggregation aggregation() {
return aggregation;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.clients.elasticsearch7;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.lang.NonNull;
/**
* AggregationsContainer implementation for the Elasticsearch7 aggregations.
*
* @author Peter-Josef Meisch
* @since 4.3
*/
public class ElasticsearchAggregations implements AggregationsContainer<Aggregations> {
private final Aggregations aggregations;
public ElasticsearchAggregations(Aggregations aggregations) {
this.aggregations = aggregations;
}
@NonNull
@Override
public Aggregations aggregations() {
return aggregations;
}
}

View File

@ -0,0 +1,6 @@
/**
* Classes and interfaces used by the code that uses Elasticsearch 7 client libraries
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.data.elasticsearch.core.clients.elasticsearch7;

View File

@ -23,13 +23,15 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* This represents the complete search response from Elasticsearch, including the returned documents. Instances must be
* created with the {@link #from(SearchResponse)} method.
*
*
* @author Peter-Josef Meisch
* @since 4.0
*/
@ -40,7 +42,7 @@ public class SearchDocumentResponse {
private final float maxScore;
private final String scrollId;
private final List<SearchDocument> searchDocuments;
private final Aggregations aggregations;
private final AggregationsContainer<?> aggregations;
private SearchDocumentResponse(long totalHits, String totalHitsRelation, float maxScore, String scrollId,
List<SearchDocument> searchDocuments, Aggregations aggregations) {
@ -49,7 +51,7 @@ public class SearchDocumentResponse {
this.maxScore = maxScore;
this.scrollId = scrollId;
this.searchDocuments = searchDocuments;
this.aggregations = aggregations;
this.aggregations = new ElasticsearchAggregations(aggregations);
}
public long getTotalHits() {
@ -72,7 +74,7 @@ public class SearchDocumentResponse {
return searchDocuments;
}
public Aggregations getAggregations() {
public AggregationsContainer<?> getAggregations() {
return aggregations;
}
@ -97,7 +99,7 @@ public class SearchDocumentResponse {
/**
* creates a {@link SearchDocumentResponse} from {@link SearchHits} with the given scrollId and aggregations
*
*
* @param searchHits the {@link SearchHits} to process
* @param scrollId scrollId
* @param aggregations aggregations

View File

@ -42,6 +42,7 @@ import java.util.stream.IntStream;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -70,6 +71,7 @@ import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.core.document.Explanation;
import org.springframework.data.elasticsearch.core.index.AliasAction;
import org.springframework.data.elasticsearch.core.index.AliasActionParameters;
@ -513,7 +515,8 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
operations.aggregate(query, SampleEntity.class) //
.as(StepVerifier::create) //
.consumeNextWith(aggregation -> {
.consumeNextWith(aggregationContainer -> {
Aggregation aggregation = ((ElasticsearchAggregation) aggregationContainer).aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
@ -1214,10 +1217,12 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (o == null || getClass() != o.getClass())
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Message message1 = (Message) o;
@ -1274,19 +1279,24 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (o == null || getClass() != o.getClass())
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SampleEntity that = (SampleEntity) o;
if (rate != that.rate)
if (rate != that.rate) {
return false;
if (id != null ? !id.equals(that.id) : that.id != null)
}
if (id != null ? !id.equals(that.id) : that.id != null) {
return false;
if (message != null ? !message.equals(that.message) : that.message != null)
}
if (message != null ? !message.equals(that.message) : that.message != null) {
return false;
}
return version != null ? version.equals(that.version) : that.version == null;
}
@ -1445,17 +1455,21 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Override
public boolean equals(Object o) {
if (this == o)
if (this == o) {
return true;
if (o == null || getClass() != o.getClass())
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ImmutableEntity that = (ImmutableEntity) o;
if (!id.equals(that.id))
if (!id.equals(that.id)) {
return false;
if (!text.equals(that.text))
}
if (!text.equals(that.text)) {
return false;
}
return seqNoPrimaryTerm != null ? seqNoPrimaryTerm.equals(that.seqNoPrimaryTerm) : that.seqNoPrimaryTerm == null;
}

View File

@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.elasticsearch.search.aggregations.Aggregations;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.data.domain.PageRequest;
@ -80,8 +79,8 @@ class SearchHitSupportTest {
private boolean closed = false;
@Override
public Aggregations getAggregations() {
return mock(Aggregations.class);
public AggregationsContainer<?> getAggregations() {
return mock(AggregationsContainer.class);
}
@Override

View File

@ -44,9 +44,11 @@ import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.InnerField;
import org.springframework.data.elasticsearch.annotations.MultiField;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
@ -128,10 +130,11 @@ public class ElasticsearchTemplateAggregationTests {
// when
SearchHits<ArticleEntity> searchHits = operations.search(searchQuery, ArticleEntity.class,
IndexCoordinates.of(INDEX_NAME));
Aggregations aggregations = searchHits.getAggregations();
AggregationsContainer<?> aggregationsContainer = searchHits.getAggregations();
// then
assertThat(aggregations).isNotNull();
assertThat(aggregationsContainer).isNotNull();
Aggregations aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
assertThat(aggregations.asMap().get("subjects")).isNotNull();
assertThat(searchHits.hasSearchHits()).isFalse();
}
@ -160,8 +163,10 @@ public class ElasticsearchTemplateAggregationTests {
SearchHits<PipelineAggsEntity> searchHits = operations.search(searchQuery, PipelineAggsEntity.class);
Aggregations aggregations = searchHits.getAggregations();
assertThat(aggregations).isNotNull();
AggregationsContainer<?> aggregationsContainer = searchHits.getAggregations();
assertThat(aggregationsContainer).isNotNull();
Aggregations aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
assertThat(aggregations.asMap().get("keyword_aggs")).isNotNull();
Aggregation keyword_bucket_stats = aggregations.asMap().get("keyword_bucket_stats");
assertThat(keyword_bucket_stats).isInstanceOf(StatsBucket.class);