From 8a6e1254bb4b07b8295ef065207cb29765a09487 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 17 Oct 2020 18:25:35 +0200 Subject: [PATCH] DATAES-796 - Add method returning Mono>. Original PR: #540 --- .../DefaultReactiveElasticsearchClient.java | 65 ++++--------- .../reactive/ReactiveElasticsearchClient.java | 25 +++++ .../core/ReactiveElasticsearchTemplate.java | 70 ++++++++++++-- .../core/ReactiveSearchOperations.java | 95 +++++++++++++++---- .../core/document/SearchDocumentResponse.java | 15 +-- ...eElasticsearchClientIntegrationTests.java} | 24 ++++- ...iveElasticsearchTemplateCallbackTests.java | 22 +++++ ...lasticsearchTemplateIntegrationTests.java} | 66 +++++++++---- 8 files changed, 280 insertions(+), 102 deletions(-) rename src/test/java/org/springframework/data/elasticsearch/client/reactive/{ReactiveElasticsearchClientTests.java => ReactiveElasticsearchClientIntegrationTests.java} (96%) rename src/test/java/org/springframework/data/elasticsearch/core/{ReactiveElasticsearchTemplateTests.java => ReactiveElasticsearchTemplateIntegrationTests.java} (97%) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 67034c1a8..c92383346 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -24,7 +24,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; import reactor.netty.http.client.HttpClient; import reactor.netty.transport.ProxyProvider; @@ -104,9 +103,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.suggest.Suggest; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - +import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; @@ -428,6 +425,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .flatMap(Flux::fromIterable); } + @Override + public Mono searchForResponse(HttpHeaders headers, SearchRequest searchRequest) { + return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers).next(); + } + @Override public Flux suggest(HttpHeaders headers, SearchRequest searchRequest) { return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) // @@ -468,21 +470,19 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return Flux.usingWhen(Mono.fromSupplier(ScrollState::new), - state -> { + state -> sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) + .expand(searchResponse -> { - return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) - .expand(searchResponse -> { + state.updateScrollId(searchResponse.getScrollId()); + if (isEmpty(searchResponse.getHits())) { + return Mono.empty(); + } - state.updateScrollId(searchResponse.getScrollId()); - if (isEmpty(searchResponse.getHits())) { - return Mono.empty(); - } + return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout), + requestCreator.scroll(), SearchResponse.class, headers); - return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout), - requestCreator.scroll(), SearchResponse.class, headers); - - }); - }, state -> cleanupScroll(headers, state), // + }), + state -> cleanupScroll(headers, state), // (state, ex) -> cleanupScroll(headers, state), // state -> cleanupScroll(headers, state)) // .filter(it -> !isEmpty(it.getHits())) // @@ -776,6 +776,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); + if (fromXContent == null) { + return Mono.error(new UncategorizedElasticsearchException( + "No method named fromXContent found in " + responseType.getCanonicalName())); + } return Mono.justOrEmpty(responseType .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content)))); @@ -925,34 +929,5 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } } - private static class SinkSubscriber implements Subscriber { - - private final Sinks.Many inbound; - - public SinkSubscriber(Sinks.Many inbound) { - this.inbound = inbound; - } - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(SearchResponse searchResponse) { - inbound.emitNext(searchResponse, Sinks.EmitFailureHandler.FAIL_FAST); - } - - @Override - public void onError(Throwable t) { - inbound.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST); - } - - @Override - public void onComplete() { - inbound.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); - } - } - // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index a85d9283d..2d8a08ea0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -427,6 +427,31 @@ public interface ReactiveElasticsearchClient { */ Flux search(HttpHeaders headers, SearchRequest searchRequest); + /** + * Execute the given {@link SearchRequest} against the {@literal search} API returning the whole response in one Mono. + * + * @param searchRequest must not be {@literal null}. + * @see Search API on + * elastic.co + * @return the {@link Mono} emitting the whole {@link SearchResponse}. + * @since 4.1 + */ + default Mono searchForResponse(SearchRequest searchRequest) { + return searchForResponse(HttpHeaders.EMPTY, searchRequest); + } + + /** + * Execute the given {@link SearchRequest} against the {@literal search} API returning the whole response in one Mono. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param searchRequest must not be {@literal null}. + * @see Search API on + * elastic.co + * @return the {@link Mono} emitting the whole {@link SearchResponse}. + * @since 4.1 + */ + Mono searchForResponse(HttpHeaders headers, SearchRequest searchRequest); + /** * Execute the given {@link SearchRequest} against the {@literal search} API. * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 39a6ef2bb..020723562 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -62,6 +62,7 @@ import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchC import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocument; +import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; @@ -296,7 +297,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index); return Flux.from(execute(client -> client.multiGet(request))) // - .concatMap(result -> callback.doWith(DocumentAdapters.from(result))); + .concatMap(result -> callback.toEntity(DocumentAdapters.from(result))); } @Override @@ -444,7 +445,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera DocumentCallback callback = new ReadDocumentCallback<>(converter, entityType, index); - return doGet(id, index).flatMap(it -> callback.doWith(DocumentAdapters.from(it))); + return doGet(id, index).flatMap(it -> callback.toEntity(DocumentAdapters.from(it))); } private Mono doGet(String id, IndexCoordinates index) { @@ -656,7 +657,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera @Override public Flux> search(Query query, Class entityType, Class resultType, IndexCoordinates index) { SearchDocumentCallback callback = new ReadSearchDocumentCallback<>(resultType, index); - return doFind(query, entityType, index).concatMap(callback::doWith); + return doFind(query, entityType, index).concatMap(callback::toSearchHit); } @Override @@ -664,6 +665,26 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return search(query, entityType, returnType, getIndexCoordinatesFor(entityType)); } + @Override + public Mono> searchForPage(Query query, Class entityType, Class resultType) { + return searchForPage(query, entityType, resultType, getIndexCoordinatesFor(entityType)); + } + + @Override + public Mono> searchForPage(Query query, Class entityType, Class resultType, + IndexCoordinates index) { + + SearchDocumentCallback callback = new ReadSearchDocumentCallback<>(resultType, index); + + return doFindForResponse(query, entityType, index) // + .flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) // + .flatMap(callback::toEntity) // + .collectList() // + .map(entities -> SearchHitMapping.mappingFor(resultType, converter) // + .mapHits(searchDocumentResponse, entities))) // + .map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable())); + } + private Flux doFind(Query query, Class clazz, IndexCoordinates index) { return Flux.defer(() -> { @@ -678,6 +699,15 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera }); } + private Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index) { + + return Mono.defer(() -> { + SearchRequest request = requestFactory.searchRequest(query, clazz, index); + request = prepareSearchRequest(request); + return doFindForResponse(request); + }); + } + @Override public Flux aggregate(Query query, Class entityType) { return aggregate(query, entityType, getIndexCoordinatesFor(entityType)); @@ -748,6 +778,21 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera .onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); } + /** + * Customization hook on the actual execution result {@link Mono}.
+ * + * @param request the already prepared {@link SearchRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}. + */ + protected Mono doFindForResponse(SearchRequest request) { + + if (QUERY_LOGGER.isDebugEnabled()) { + QUERY_LOGGER.debug("Executing doFindForResponse: {}", request); + } + + return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(SearchDocumentResponse::from); + } + /** * Customization hook on the actual execution result {@link Publisher}.
* @@ -935,7 +980,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera protected interface DocumentCallback { @NonNull - Mono doWith(@Nullable Document document); + Mono toEntity(@Nullable Document document); } protected class ReadDocumentCallback implements DocumentCallback { @@ -953,7 +998,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera } @NonNull - public Mono doWith(@Nullable Document document) { + public Mono toEntity(@Nullable Document document) { if (document == null) { return Mono.empty(); } @@ -966,7 +1011,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera protected interface SearchDocumentCallback { @NonNull - Mono> doWith(@NonNull SearchDocument response); + Mono toEntity(@NonNull SearchDocument response); + + @NonNull + Mono> toSearchHit(@NonNull SearchDocument response); } protected class ReadSearchDocumentCallback implements SearchDocumentCallback { @@ -981,9 +1029,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera } @Override - public Mono> doWith(SearchDocument response) { - return delegate.doWith(response) - .map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity)); + public Mono toEntity(SearchDocument response) { + return delegate.toEntity(response); + } + + @Override + public Mono> toSearchHit(SearchDocument response) { + return toEntity(response).map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity)); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java index 18127c3e9..c93bb4edf 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java @@ -134,20 +134,6 @@ public interface ReactiveSearchOperations { */ Mono count(Query query, Class entityType, IndexCoordinates index); - /** - * Search the index for entities matching the given {@link Query query}.
- * {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either * - * delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) * - * size}. - * - * @param query must not be {@literal null}. - * @param entityType The entity type for mapping the query. Must not be {@literal null}. - * @param returnType The mapping target type. Must not be {@literal null}. Th - * @param - * @return a {@link Flux} emitting matching entities one by one wrapped in a {@link SearchHit}. - */ - Flux> search(Query query, Class entityType, Class returnType); - /** * Search the index for entities matching the given {@link Query query}.
* {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either @@ -164,17 +150,18 @@ public interface ReactiveSearchOperations { } /** - * Search the index for entities matching the given {@link Query query}. + * Search the index for entities matching the given {@link Query query}.
+ * {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either * + * delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) * + * size}. * - * @param * @param query must not be {@literal null}. - * @param entityType must not be {@literal null}. - * @param resultType the projection result type. - * @param index the target index, must not be {@literal null} + * @param entityType The entity type for mapping the query. Must not be {@literal null}. + * @param returnType The mapping target type. Must not be {@literal null}. Th * @param * @return a {@link Flux} emitting matching entities one by one wrapped in a {@link SearchHit}. */ - Flux> search(Query query, Class entityType, Class resultType, IndexCoordinates index); + Flux> search(Query query, Class entityType, Class returnType); /** * Search the index for entities matching the given {@link Query query}. @@ -189,6 +176,74 @@ public interface ReactiveSearchOperations { return search(query, entityType, entityType, index); } + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param resultType the projection result type. + * @param index the target index, must not be {@literal null} + * @param + * @return a {@link Flux} emitting matching entities one by one wrapped in a {@link SearchHit}. + */ + Flux> search(Query query, Class entityType, Class resultType, IndexCoordinates index); + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param + * @return a {@link Mono} emitting matching entities in a {@link SearchHits}. + * @since 4.1 + */ + default Mono> searchForPage(Query query, Class entityType) { + return searchForPage(query, entityType, entityType); + } + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param resultType the projection result type. + * @param + * @return a {@link Mono} emitting matching entities in a {@link SearchHits}. + * @since 4.1 + */ + Mono> searchForPage(Query query, Class entityType, Class resultType); + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the target index, must not be {@literal null} + * @param + * @return a {@link Mono} emitting matching entities in a {@link SearchHits}. + * @since 4.1 + */ + default Mono> searchForPage(Query query, Class entityType, IndexCoordinates index) { + return searchForPage(query, entityType, entityType, index); + } + + /** + * Search the index for entities matching the given {@link Query query}. + * + * @param + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param resultType the projection result type. + * @param index the target index, must not be {@literal null} + * @param + * @return a {@link Mono} emitting matching entities in a {@link SearchHits}. + * @since 4.1 + */ + Mono> searchForPage(Query query, Class entityType, Class resultType, IndexCoordinates index); + /** * Perform an aggregation specified by the given {@link Query query}.
* diff --git a/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java index 5b5854a42..039b8419b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java @@ -15,13 +15,12 @@ */ package org.springframework.data.elasticsearch.core.document; +import java.util.ArrayList; import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregations; import org.springframework.lang.Nullable; @@ -122,10 +121,12 @@ public class SearchDocumentResponse { float maxScore = searchHits.getMaxScore(); - List searchDocuments = StreamSupport.stream(searchHits.spliterator(), false) // - .filter(Objects::nonNull) // - .map(DocumentAdapters::from) // - .collect(Collectors.toList()); + List searchDocuments = new ArrayList<>(); + for (SearchHit searchHit : searchHits) { + if (searchHit != null) { + searchDocuments.add(DocumentAdapters.from(searchHit)); + } + } return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations); } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java similarity index 96% rename from src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java rename to src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java index e5d14c96c..908f2a51a 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -75,8 +76,8 @@ import org.springframework.test.context.ContextConfiguration; * @author Thomas Geese */ @SpringIntegrationTest -@ContextConfiguration(classes = { ReactiveElasticsearchClientTests.Config.class }) -public class ReactiveElasticsearchClientTests { +@ContextConfiguration(classes = { ReactiveElasticsearchClientIntegrationTests.Config.class }) +public class ReactiveElasticsearchClientIntegrationTests { @Configuration static class Config extends ReactiveElasticsearchRestTemplateConfiguration { @@ -716,6 +717,21 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } + @Test // DATAES-796 + @DisplayName("should return the whole SearchResponse") + void shouldReturnTheWholeSearchResponse() { + addSourceDocument().to(INDEX_I); + addSourceDocument().to(INDEX_I); + + SearchRequest request = new SearchRequest(INDEX_I) // + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); + + client.searchForResponse(request) // + .as(StepVerifier::create) // + .consumeNextWith(searchResponse -> assertThat(searchResponse.getHits().getTotalHits().value).isEqualTo(2)) + .verifyComplete(); + } + private AddToIndex addSourceDocument() { return add(DOC_SOURCE); } @@ -726,9 +742,9 @@ public class ReactiveElasticsearchClientTests { private IndexRequest indexRequest() { - return new IndexRequest(ReactiveElasticsearchClientTests.INDEX_I) // + return new IndexRequest(ReactiveElasticsearchClientIntegrationTests.INDEX_I) // .id(UUID.randomUUID().toString()) // - .source(ReactiveElasticsearchClientTests.DOC_SOURCE) // + .source(ReactiveElasticsearchClientIntegrationTests.DOC_SOURCE) // .setRefreshPolicy(RefreshPolicy.IMMEDIATE) // .create(true); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateCallbackTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateCallbackTests.java index ce24f82e9..deb12dae4 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateCallbackTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateCallbackTests.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -85,6 +86,7 @@ public class ReactiveElasticsearchTemplateCallbackTests { @Mock private DocWriteResponse docWriteResponse; @Mock private GetResult getResult; @Mock private org.elasticsearch.search.SearchHit searchHit; + @Mock private org.elasticsearch.action.search.SearchResponse searchResponse; private final IndexCoordinates index = IndexCoordinates.of("index"); @@ -121,6 +123,12 @@ public class ReactiveElasticsearchTemplateCallbackTests { doReturn(Mono.just(getResult)).when(client).get(any(GetRequest.class)); when(client.search(any(SearchRequest.class))).thenReturn(Flux.just(searchHit, searchHit)); + when(client.searchForResponse(any(SearchRequest.class))).thenReturn(Mono.just(searchResponse)); + + when(searchResponse.getHits()).thenReturn( + new org.elasticsearch.search.SearchHits(new org.elasticsearch.search.SearchHit[] { searchHit, searchHit }, + new TotalHits(2, TotalHits.Relation.EQUAL_TO), 1.0f)); + doReturn(new BytesArray(new byte[8])).when(searchHit).getSourceRef(); doReturn(new HashMap() { { @@ -374,6 +382,20 @@ public class ReactiveElasticsearchTemplateCallbackTests { assertThat(results.get(1).getContent().firstname).isEqualTo("after-convert"); } + @Test // DATAES-796 + void searchForPageShouldInvokeAfterConvertCallbacks() { + + template.setEntityCallbacks(ReactiveEntityCallbacks.create(afterConvertCallback)); + + SearchPage searchPage = template.searchForPage(pagedQueryForTwo(), Person.class) + .timeout(Duration.ofSeconds(1)).block(); + + verify(afterConvertCallback, times(2)).onAfterConvert(eq(new Person("init", "luke")), eq(lukeDocument()), any()); + SearchHits searchHits = searchPage.getSearchHits(); + assertThat(searchHits.getSearchHit(0).getContent().firstname).isEqualTo("after-convert"); + assertThat(searchHits.getSearchHit(1).getContent().firstname).isEqualTo("after-convert"); + } + @Test // DATAES-772 void searchWithIndexCoordinatesShouldInvokeAfterConvertCallbacks() { diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java similarity index 97% rename from src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java rename to src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index e51bd43bc..9e276d752 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -32,6 +32,7 @@ import java.lang.Boolean; import java.lang.Long; import java.lang.Object; import java.net.ConnectException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -49,6 +50,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -84,7 +86,7 @@ import org.springframework.util.StringUtils; * @author Roman Puchkovskiy */ @SpringIntegrationTest -public class ReactiveElasticsearchTemplateTests { +public class ReactiveElasticsearchTemplateIntegrationTests { @Configuration @Import({ ReactiveElasticsearchRestTemplateConfiguration.class }) @@ -96,6 +98,7 @@ public class ReactiveElasticsearchTemplateTests { @Autowired private ReactiveElasticsearchTemplate template; private ReactiveIndexOperations indexOperations; + // region Setup @BeforeEach public void setUp() { indexOperations = template.indexOps(SampleEntity.class); @@ -122,7 +125,9 @@ public class ReactiveElasticsearchTemplateTests { template.indexOps(IndexCoordinates.of("test-index-reactive-optimistic-and-versioned-entity-template")).delete() .block(); } + // endregion + // region Tests @Test // DATAES-504 public void executeShouldProvideResource() { @@ -1010,25 +1015,31 @@ public class ReactiveElasticsearchTemplateTests { assertThatSeqNoPrimaryTermIsFilled(saved); } - @Data - @Document(indexName = "marvel") - static class Person { - - private @Id String id; - private String name; - private int age; - - public Person() {} - - public Person(String name, int age) { - - this.name = name; - this.age = age; + @Test // DATAES-796 + @DisplayName("should return Mono of SearchPage") + void shouldReturnMonoOfSearchPage() { + List entities = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + entities.add(randomEntity("message " + i)); } + + Query query = Query.findAll().setPageable(PageRequest.of(0, 5)); + + template.saveAll(Mono.just(entities), SampleEntity.class).then(indexOperations.refresh()).block(); + + Mono> searchPageMono = template.searchForPage(query, SampleEntity.class); + + searchPageMono.as(StepVerifier::create) // + .consumeNextWith(searchPage -> { + assertThat(searchPage.hasNext()).isTrue(); + SearchHits searchHits = searchPage.getSearchHits(); + assertThat(searchHits.getTotalHits()).isEqualTo(10); + assertThat(searchHits.getSearchHits().size()).isEqualTo(5); + }).verifyComplete(); } + // endregion - // --> JUST some helpers - + // region Helper functions private SampleEntity randomEntity(String message) { return SampleEntity.builder() // @@ -1057,11 +1068,31 @@ public class ReactiveElasticsearchTemplateTests { template.saveAll(Mono.just(Arrays.asList(entities)), indexCoordinates).then(indexOperations.refresh()).block(); } } + // endregion + + // region Entities + @Data + @Document(indexName = "marvel") + static class Person { + + private @Id String id; + private String name; + private int age; + + public Person() {} + + public Person(String name, int age) { + + this.name = name; + this.age = age; + } + } @Data @AllArgsConstructor @NoArgsConstructor static class Message { + String message; } @@ -1103,4 +1134,5 @@ public class ReactiveElasticsearchTemplateTests { @Id private String id; @Version private Long version; } + // endregion }