Add reactive SearchHits implementation.

Original Pull Request #2017
Closes #2015
This commit is contained in:
Peter-Josef Meisch 2021-11-30 20:47:29 +01:00 committed by GitHub
parent 989c2807fb
commit 8fad48b3f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 332 additions and 24 deletions

View File

@ -144,6 +144,9 @@ Returned by the low level scroll API functions in `ElasticsearchRestTemplate`, i
.SearchHitsIterator<T> .SearchHitsIterator<T>
An Iterator returned by the streaming functions of the `SearchOperations` interface. An Iterator returned by the streaming functions of the `SearchOperations` interface.
.ReactiveSearchHits
`ReactiveSearchOperations` has methods returning a `Mono<ReactiveSearchHits<T>>`, this contains the same information as a `SearchHits<T>` object, but will provide the contained `SearchHit<T>` objects as a `Flux<SearchHit<T>>` and not as a list.
[[elasticsearch.operations.queries]] [[elasticsearch.operations.queries]]
== Queries == Queries

View File

@ -754,6 +754,31 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable())); .map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
} }
@Override
public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType) {
return searchForHits(query, entityType, resultType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(entityType, "entityType must not be null");
Assert.notNull(resultType, "resultType must not be null");
Assert.notNull(index, "index must not be null");
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
.map(ReactiveSearchHitSupport::searchHitsFor);
}
private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) { private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {
return Flux.defer(() -> { return Flux.defer(() -> {
@ -777,8 +802,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
request = prepareSearchRequest(request, false); request = prepareSearchRequest(request, false);
SearchDocumentCallback<?> documentCallback = new ReadSearchDocumentCallback<>(clazz, index); SearchDocumentCallback<?> documentCallback = new ReadSearchDocumentCallback<>(clazz, index);
Function<SearchDocument, Object> entityCreator = searchDocument -> documentCallback.toEntity(searchDocument)
return doFindForResponse(request, searchDocument -> documentCallback.toEntity(searchDocument).block()); .block();
return doFindForResponse(request, entityCreator);
}); });
} }
@ -895,19 +921,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* Customization hook on the actual execution result {@link Mono}. <br /> * Customization hook on the actual execution result {@link Mono}. <br />
* *
* @param request the already prepared {@link SearchRequest} ready to be executed. * @param request the already prepared {@link SearchRequest} ready to be executed.
* @param suggestEntityCreator * @param entityCreator
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}. * @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
*/ */
protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request, protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
Function<SearchDocument, ? extends Object> suggestEntityCreator) { Function<SearchDocument, ? extends Object> entityCreator) {
if (QUERY_LOGGER.isDebugEnabled()) { if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doFindForResponse: %s", request)); QUERY_LOGGER.debug(String.format("Executing doFindForResponse: %s", request));
} }
return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(searchResponse -> { return Mono.from(execute(client -> client.searchForResponse(request)))
return SearchDocumentResponse.from(searchResponse, suggestEntityCreator); .map(searchResponse -> SearchDocumentResponse.from(searchResponse, entityCreator));
});
} }
/** /**

View File

@ -99,12 +99,12 @@ public class SearchDocumentResponse {
* creates a SearchDocumentResponse from the {@link SearchResponse} * creates a SearchDocumentResponse from the {@link SearchResponse}
* *
* @param searchResponse must not be {@literal null} * @param searchResponse must not be {@literal null}
* @param suggestEntityCreator function to create an entity from a {@link SearchDocument} * @param entityCreator function to create an entity from a {@link SearchDocument}
* @param <T> entity type * @param <T> entity type
* @return the SearchDocumentResponse * @return the SearchDocumentResponse
*/ */
public static <T> SearchDocumentResponse from(SearchResponse searchResponse, public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
Function<SearchDocument, T> suggestEntityCreator) { Function<SearchDocument, T> entityCreator) {
Assert.notNull(searchResponse, "searchResponse must not be null"); Assert.notNull(searchResponse, "searchResponse must not be null");
@ -113,7 +113,7 @@ public class SearchDocumentResponse {
Aggregations aggregations = searchResponse.getAggregations(); Aggregations aggregations = searchResponse.getAggregations();
org.elasticsearch.search.suggest.Suggest suggest = searchResponse.getSuggest(); org.elasticsearch.search.suggest.Suggest suggest = searchResponse.getSuggest();
return from(searchHits, scrollId, aggregations, suggest, suggestEntityCreator); return from(searchHits, scrollId, aggregations, suggest, entityCreator);
} }
/** /**
@ -123,14 +123,14 @@ public class SearchDocumentResponse {
* @param scrollId scrollId * @param scrollId scrollId
* @param aggregations aggregations * @param aggregations aggregations
* @param suggestES the suggestion response from Elasticsearch * @param suggestES the suggestion response from Elasticsearch
* @param suggestEntityCreator function to create an entity from a {@link SearchDocument} * @param entityCreator function to create an entity from a {@link SearchDocument}
* @param <T> entity type * @param <T> entity type
* @return the {@link SearchDocumentResponse} * @return the {@link SearchDocumentResponse}
* @since 4.3 * @since 4.3
*/ */
public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId, public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId,
@Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES, @Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES,
Function<SearchDocument, T> suggestEntityCreator) { Function<SearchDocument, T> entityCreator) {
TotalHits responseTotalHits = searchHits.getTotalHits(); TotalHits responseTotalHits = searchHits.getTotalHits();
@ -154,7 +154,7 @@ public class SearchDocumentResponse {
} }
} }
Suggest suggest = suggestFrom(suggestES, suggestEntityCreator); Suggest suggest = suggestFrom(suggestES, entityCreator);
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations, return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations,
suggest); suggest);
} }

View File

@ -0,0 +1,33 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import org.springframework.util.Assert;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
public final class ReactiveSearchHitSupport {
private ReactiveSearchHitSupport() {}
public static <T> ReactiveSearchHits<T> searchHitsFor(SearchHits<T> searchHits) {
Assert.notNull(searchHits, "searchHits must not be null");
return new ReactiveSearchHitsImpl<>(searchHits);
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import reactor.core.publisher.Flux;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.lang.Nullable;
/**
* Encapsulates a Flux of {@link SearchHit}s with additional information from the search.
*
* @param <T> the result data class.
* @author Peter-Josef Meisch
* @since 4.4
*/
public interface ReactiveSearchHits<T> {
/**
* @return the aggregations.
*/
@Nullable
AggregationsContainer<?> getAggregations();
float getMaxScore();
/**
* @return the {@link SearchHit}s from the search result.
*/
Flux<SearchHit<T>> getSearchHits();
/**
* @return the number of total hits.
*/
long getTotalHits();
/**
* @return the relation for the total hits
*/
TotalHitsRelation getTotalHitsRelation();
/**
* @return true if aggregations are available
*/
boolean hasAggregations();
/**
* @return whether the {@link SearchHits} has search hits.
*/
boolean hasSearchHits();
/**
* @return the suggest response
*/
@Nullable
Suggest getSuggest();
/**
* @return wether the {@link SearchHits} has a suggest response.
*/
boolean hasSuggest();
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import reactor.core.publisher.Flux;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.lang.Nullable;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ReactiveSearchHitsImpl<T> implements ReactiveSearchHits<T> {
protected final SearchHits<T> delegate;
public ReactiveSearchHitsImpl(SearchHits<T> delegate) {
this.delegate = delegate;
}
@Override
public long getTotalHits() {
return delegate.getTotalHits();
}
@Override
public TotalHitsRelation getTotalHitsRelation() {
return delegate.getTotalHitsRelation();
}
@Override
public boolean hasAggregations() {
return delegate.hasAggregations();
}
@Override
@Nullable
public AggregationsContainer<?> getAggregations() {
return delegate.getAggregations();
}
@Override
public float getMaxScore() {
return delegate.getMaxScore();
}
@Override
public boolean hasSearchHits() {
return delegate.hasSearchHits();
}
@Override
public Flux<SearchHit<T>> getSearchHits() {
return Flux.defer(() -> Flux.fromIterable(delegate.getSearchHits()));
}
@Override
@Nullable
public Suggest getSuggest() {
return delegate.getSuggest();
}
@Override
public boolean hasSuggest() {
return delegate.hasSuggest();
}
}

View File

@ -15,7 +15,6 @@
*/ */
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import org.springframework.data.elasticsearch.backend.elasticsearch7.query.NativeSearchQuery;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -23,6 +22,7 @@ import java.util.List;
import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilder;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.backend.elasticsearch7.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest; import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
@ -171,12 +171,68 @@ public interface ReactiveSearchOperations {
* @param entityType must not be {@literal null}. * @param entityType must not be {@literal null}.
* @param resultType the projection result type. * @param resultType the projection result type.
* @param index the target index, must not be {@literal null} * @param index the target index, must not be {@literal null}
* @param <T>
* @return a {@link Mono} emitting matching entities in a {@link SearchHits}. * @return a {@link Mono} emitting matching entities in a {@link SearchHits}.
* @since 4.1 * @since 4.1
*/ */
<T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index); <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index);
/**
* Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and
* which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method.
*
* @param <T> the result type class
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information
* @since 4.4
*/
default <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<T> entityType) {
return searchForHits(query, entityType, entityType);
}
/**
* Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and
* which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method.
*
* @param <T> the result type class
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param resultType the projection result type.
* @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information
* @since 4.4
*/
<T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType);
/**
* Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and
* which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method.
*
* @param <T> the result type class
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information
* @since 4.4
*/
default <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<T> entityType, IndexCoordinates index) {
return searchForHits(query, entityType, entityType, index);
}
/**
* Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and
* which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method.
*
* @param <T> the result type class
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param resultType the projection result type.
* @param index the target index, must not be {@literal null}
* @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information
* @since 4.4
*/
<T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index);
/** /**
* Perform an aggregation specified by the given {@link Query query}. <br /> * Perform an aggregation specified by the given {@link Query query}. <br />
* *

View File

@ -110,6 +110,7 @@ import org.springframework.util.StringUtils;
* @author Roman Puchkovskiy * @author Roman Puchkovskiy
* @author George Popides * @author George Popides
*/ */
@SuppressWarnings("SpringJavaAutowiredMembersInspection")
@SpringIntegrationTest @SpringIntegrationTest
public class ReactiveElasticsearchTemplateIntegrationTests { public class ReactiveElasticsearchTemplateIntegrationTests {
@ -1175,6 +1176,31 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
assertThat(retrieved).isEqualTo(savedEntity.get()); assertThat(retrieved).isEqualTo(savedEntity.get());
}).verifyComplete(); }).verifyComplete();
} }
@Test // #2015
@DisplayName("should return Mono of ReactiveSearchHits")
void shouldReturnMonoOfReactiveSearchHits() {
List<SampleEntity> entities = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
entities.add(randomEntity("message " + i));
}
Query query = Query.findAll().setPageable(PageRequest.of(0, 7));
operations.saveAll(Mono.just(entities), SampleEntity.class).then().block();
Mono<ReactiveSearchHits<SampleEntity>> searchHitsMono = operations.searchForHits(query, SampleEntity.class);
searchHitsMono.as(StepVerifier::create) //
.consumeNextWith(reactiveSearchHits -> {
assertThat(reactiveSearchHits.getTotalHits()).isEqualTo(20);
reactiveSearchHits.getSearchHits().as(StepVerifier::create) //
.expectNextCount(7) //
.verifyComplete(); //
}) //
.verifyComplete();
}
// endregion // endregion
// region Helper functions // region Helper functions
@ -1249,10 +1275,13 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Document(indexName = "#{@indexNameProvider.indexName()}") @Document(indexName = "#{@indexNameProvider.indexName()}")
static class SampleEntity { static class SampleEntity {
@Nullable @Id private String id; @Nullable
@Nullable @Field(type = Text, store = true, fielddata = true) private String message; @Id private String id;
@Nullable
@Field(type = Text, store = true, fielddata = true) private String message;
@Nullable private int rate; @Nullable private int rate;
@Nullable @Version private Long version; @Nullable
@Version private Long version;
@Nullable @Nullable
public String getId() { public String getId() {
@ -1324,7 +1353,8 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Document(indexName = "#{@indexNameProvider.indexName()}") @Document(indexName = "#{@indexNameProvider.indexName()}")
static class OptimisticEntity { static class OptimisticEntity {
@Nullable @Id private String id; @Nullable
@Id private String id;
@Nullable private String message; @Nullable private String message;
@Nullable private SeqNoPrimaryTerm seqNoPrimaryTerm; @Nullable private SeqNoPrimaryTerm seqNoPrimaryTerm;
@ -1358,10 +1388,12 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Document(indexName = "#{@indexNameProvider.indexName()}") @Document(indexName = "#{@indexNameProvider.indexName()}")
static class OptimisticAndVersionedEntity { static class OptimisticAndVersionedEntity {
@Nullable @Id private String id; @Nullable
@Id private String id;
@Nullable private String message; @Nullable private String message;
@Nullable private SeqNoPrimaryTerm seqNoPrimaryTerm; @Nullable private SeqNoPrimaryTerm seqNoPrimaryTerm;
@Nullable @Version private Long version; @Nullable
@Version private Long version;
@Nullable @Nullable
public String getId() { public String getId() {
@ -1402,8 +1434,10 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Document(indexName = "#{@indexNameProvider.indexName()}") @Document(indexName = "#{@indexNameProvider.indexName()}")
static class VersionedEntity { static class VersionedEntity {
@Nullable @Id private String id; @Nullable
@Nullable @Version private Long version; @Id private String id;
@Nullable
@Version private Long version;
@Nullable @Nullable
public String getId() { public String getId() {
@ -1428,7 +1462,8 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
@Setting(settingPath = "settings/test-settings.json") @Setting(settingPath = "settings/test-settings.json")
@Mapping(mappingPath = "mappings/test-mappings.json") @Mapping(mappingPath = "mappings/test-mappings.json")
private static class EntityWithSettingsAndMappingsReactive { private static class EntityWithSettingsAndMappingsReactive {
@Nullable @Id String id; @Nullable
@Id String id;
@Nullable @Nullable
public String getId() { public String getId() {