DATAES-796 - Add method returning Mono<SearchPage<T>>.

Original PR: #540
This commit is contained in:
Peter-Josef Meisch 2020-10-17 18:25:35 +02:00 committed by GitHub
parent 23ac6d77cf
commit 8a6e1254bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 280 additions and 102 deletions

View File

@ -24,7 +24,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.ProxyProvider;
@ -104,9 +103,7 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@ -428,6 +425,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.flatMap(Flux::fromIterable);
}
@Override
public Mono<SearchResponse> searchForResponse(HttpHeaders headers, SearchRequest searchRequest) {
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers).next();
}
@Override
public Flux<Suggest> suggest(HttpHeaders headers, SearchRequest searchRequest) {
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
@ -468,21 +470,19 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
state -> {
state -> sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
.expand(searchResponse -> {
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
.expand(searchResponse -> {
state.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
return Mono.empty();
}
state.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
return Mono.empty();
}
return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
requestCreator.scroll(), SearchResponse.class, headers);
return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
requestCreator.scroll(), SearchResponse.class, headers);
});
}, state -> cleanupScroll(headers, state), //
}),
state -> cleanupScroll(headers, state), //
(state, ex) -> cleanupScroll(headers, state), //
state -> cleanupScroll(headers, state)) //
.filter(it -> !isEmpty(it.getHits())) //
@ -776,6 +776,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
if (fromXContent == null) {
return Mono.error(new UncategorizedElasticsearchException(
"No method named fromXContent found in " + responseType.getCanonicalName()));
}
return Mono.justOrEmpty(responseType
.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
@ -925,34 +929,5 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
}
private static class SinkSubscriber implements Subscriber<SearchResponse> {
private final Sinks.Many<SearchResponse> inbound;
public SinkSubscriber(Sinks.Many<SearchResponse> inbound) {
this.inbound = inbound;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(SearchResponse searchResponse) {
inbound.emitNext(searchResponse, Sinks.EmitFailureHandler.FAIL_FAST);
}
@Override
public void onError(Throwable t) {
inbound.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST);
}
@Override
public void onComplete() {
inbound.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}
}
// endregion
}

View File

@ -427,6 +427,31 @@ public interface ReactiveElasticsearchClient {
*/
Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest);
/**
* Execute the given {@link SearchRequest} against the {@literal search} API returning the whole response in one Mono.
*
* @param searchRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Mono} emitting the whole {@link SearchResponse}.
* @since 4.1
*/
default Mono<SearchResponse> searchForResponse(SearchRequest searchRequest) {
return searchForResponse(HttpHeaders.EMPTY, searchRequest);
}
/**
* Execute the given {@link SearchRequest} against the {@literal search} API returning the whole response in one Mono.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param searchRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Mono} emitting the whole {@link SearchResponse}.
* @since 4.1
*/
Mono<SearchResponse> searchForResponse(HttpHeaders headers, SearchRequest searchRequest);
/**
* Execute the given {@link SearchRequest} against the {@literal search} API.
*

View File

@ -62,6 +62,7 @@ import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchC
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
@ -296,7 +297,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index);
return Flux.from(execute(client -> client.multiGet(request))) //
.concatMap(result -> callback.doWith(DocumentAdapters.from(result)));
.concatMap(result -> callback.toEntity(DocumentAdapters.from(result)));
}
@Override
@ -444,7 +445,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, entityType, index);
return doGet(id, index).flatMap(it -> callback.doWith(DocumentAdapters.from(it)));
return doGet(id, index).flatMap(it -> callback.toEntity(DocumentAdapters.from(it)));
}
private Mono<GetResult> doGet(String id, IndexCoordinates index) {
@ -656,7 +657,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
@Override
public <T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index) {
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFind(query, entityType, index).concatMap(callback::doWith);
return doFind(query, entityType, index).concatMap(callback::toSearchHit);
}
@Override
@ -664,6 +665,26 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return search(query, entityType, returnType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType) {
return searchForPage(query, entityType, resultType, getIndexCoordinatesFor(entityType));
}
@Override
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index) {
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
}
private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {
return Flux.defer(() -> {
@ -678,6 +699,15 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
});
}
private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
return Mono.defer(() -> {
SearchRequest request = requestFactory.searchRequest(query, clazz, index);
request = prepareSearchRequest(request);
return doFindForResponse(request);
});
}
@Override
public Flux<Aggregation> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
@ -748,6 +778,21 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
/**
* Customization hook on the actual execution result {@link Mono}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
*/
protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doFindForResponse: {}", request);
}
return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(SearchDocumentResponse::from);
}
/**
* Customization hook on the actual execution result {@link Publisher}. <br />
*
@ -935,7 +980,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
protected interface DocumentCallback<T> {
@NonNull
Mono<T> doWith(@Nullable Document document);
Mono<T> toEntity(@Nullable Document document);
}
protected class ReadDocumentCallback<T> implements DocumentCallback<T> {
@ -953,7 +998,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
@NonNull
public Mono<T> doWith(@Nullable Document document) {
public Mono<T> toEntity(@Nullable Document document) {
if (document == null) {
return Mono.empty();
}
@ -966,7 +1011,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
protected interface SearchDocumentCallback<T> {
@NonNull
Mono<SearchHit<T>> doWith(@NonNull SearchDocument response);
Mono<T> toEntity(@NonNull SearchDocument response);
@NonNull
Mono<SearchHit<T>> toSearchHit(@NonNull SearchDocument response);
}
protected class ReadSearchDocumentCallback<T> implements SearchDocumentCallback<T> {
@ -981,9 +1029,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
@Override
public Mono<SearchHit<T>> doWith(SearchDocument response) {
return delegate.doWith(response)
.map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity));
public Mono<T> toEntity(SearchDocument response) {
return delegate.toEntity(response);
}
@Override
public Mono<SearchHit<T>> toSearchHit(SearchDocument response) {
return toEntity(response).map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity));
}
}

View File

@ -134,20 +134,6 @@ public interface ReactiveSearchOperations {
*/
Mono<Long> count(Query query, Class<?> entityType, IndexCoordinates index);
/**
* Search the index for entities matching the given {@link Query query}. <br />
* {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either *
* delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) *
* size}.
*
* @param query must not be {@literal null}.
* @param entityType The entity type for mapping the query. Must not be {@literal null}.
* @param returnType The mapping target type. Must not be {@literal null}. Th
* @param <T>
* @return a {@link Flux} emitting matching entities one by one wrapped in a {@link SearchHit}.
*/
<T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> returnType);
/**
* Search the index for entities matching the given {@link Query query}. <br />
* {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either
@ -164,17 +150,18 @@ public interface ReactiveSearchOperations {
}
/**
* Search the index for entities matching the given {@link Query query}.
* Search the index for entities matching the given {@link Query query}. <br />
* {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either *
* delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) *
* size}.
*
* @param <T>
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param resultType the projection result type.
* @param index the target index, must not be {@literal null}
* @param entityType The entity type for mapping the query. Must not be {@literal null}.
* @param returnType The mapping target type. Must not be {@literal null}. Th
* @param <T>
* @return a {@link Flux} emitting matching entities one by one wrapped in a {@link SearchHit}.
*/
<T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index);
<T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> returnType);
/**
* Search the index for entities matching the given {@link Query query}.
@ -189,6 +176,74 @@ public interface ReactiveSearchOperations {
return search(query, entityType, entityType, index);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param resultType the projection result type.
* @param index the target index, must not be {@literal null}
* @param <T>
* @return a {@link Flux} emitting matching entities one by one wrapped in a {@link SearchHit}.
*/
<T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index);
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param <T>
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param <T>
* @return a {@link Mono} emitting matching entities in a {@link SearchHits}.
* @since 4.1
*/
default <T> Mono<SearchPage<T>> searchForPage(Query query, Class<T> entityType) {
return searchForPage(query, entityType, entityType);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param <T>
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param resultType the projection result type.
* @param <T>
* @return a {@link Mono} emitting matching entities in a {@link SearchHits}.
* @since 4.1
*/
<T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType);
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param <T>
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param index the target index, must not be {@literal null}
* @param <T>
* @return a {@link Mono} emitting matching entities in a {@link SearchHits}.
* @since 4.1
*/
default <T> Mono<SearchPage<T>> searchForPage(Query query, Class<T> entityType, IndexCoordinates index) {
return searchForPage(query, entityType, entityType, index);
}
/**
* Search the index for entities matching the given {@link Query query}.
*
* @param <T>
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param resultType the projection result type.
* @param index the target index, must not be {@literal null}
* @param <T>
* @return a {@link Mono} emitting matching entities in a {@link SearchHits}.
* @since 4.1
*/
<T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index);
/**
* Perform an aggregation specified by the given {@link Query query}. <br />
*

View File

@ -15,13 +15,12 @@
*/
package org.springframework.data.elasticsearch.core.document;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.lang.Nullable;
@ -122,10 +121,12 @@ public class SearchDocumentResponse {
float maxScore = searchHits.getMaxScore();
List<SearchDocument> searchDocuments = StreamSupport.stream(searchHits.spliterator(), false) //
.filter(Objects::nonNull) //
.map(DocumentAdapters::from) //
.collect(Collectors.toList());
List<SearchDocument> searchDocuments = new ArrayList<>();
for (SearchHit searchHit : searchHits) {
if (searchHit != null) {
searchDocuments.add(DocumentAdapters.from(searchHit));
}
}
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations);
}

View File

@ -52,6 +52,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -75,8 +76,8 @@ import org.springframework.test.context.ContextConfiguration;
* @author Thomas Geese
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ReactiveElasticsearchClientTests.Config.class })
public class ReactiveElasticsearchClientTests {
@ContextConfiguration(classes = { ReactiveElasticsearchClientIntegrationTests.Config.class })
public class ReactiveElasticsearchClientIntegrationTests {
@Configuration
static class Config extends ReactiveElasticsearchRestTemplateConfiguration {
@ -716,6 +717,21 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-796
@DisplayName("should return the whole SearchResponse")
void shouldReturnTheWholeSearchResponse() {
addSourceDocument().to(INDEX_I);
addSourceDocument().to(INDEX_I);
SearchRequest request = new SearchRequest(INDEX_I) //
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
client.searchForResponse(request) //
.as(StepVerifier::create) //
.consumeNextWith(searchResponse -> assertThat(searchResponse.getHits().getTotalHits().value).isEqualTo(2))
.verifyComplete();
}
private AddToIndex addSourceDocument() {
return add(DOC_SOURCE);
}
@ -726,9 +742,9 @@ public class ReactiveElasticsearchClientTests {
private IndexRequest indexRequest() {
return new IndexRequest(ReactiveElasticsearchClientTests.INDEX_I) //
return new IndexRequest(ReactiveElasticsearchClientIntegrationTests.INDEX_I) //
.id(UUID.randomUUID().toString()) //
.source(ReactiveElasticsearchClientTests.DOC_SOURCE) //
.source(ReactiveElasticsearchClientIntegrationTests.DOC_SOURCE) //
.setRefreshPolicy(RefreshPolicy.IMMEDIATE) //
.create(true);
}

View File

@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -85,6 +86,7 @@ public class ReactiveElasticsearchTemplateCallbackTests {
@Mock private DocWriteResponse docWriteResponse;
@Mock private GetResult getResult;
@Mock private org.elasticsearch.search.SearchHit searchHit;
@Mock private org.elasticsearch.action.search.SearchResponse searchResponse;
private final IndexCoordinates index = IndexCoordinates.of("index");
@ -121,6 +123,12 @@ public class ReactiveElasticsearchTemplateCallbackTests {
doReturn(Mono.just(getResult)).when(client).get(any(GetRequest.class));
when(client.search(any(SearchRequest.class))).thenReturn(Flux.just(searchHit, searchHit));
when(client.searchForResponse(any(SearchRequest.class))).thenReturn(Mono.just(searchResponse));
when(searchResponse.getHits()).thenReturn(
new org.elasticsearch.search.SearchHits(new org.elasticsearch.search.SearchHit[] { searchHit, searchHit },
new TotalHits(2, TotalHits.Relation.EQUAL_TO), 1.0f));
doReturn(new BytesArray(new byte[8])).when(searchHit).getSourceRef();
doReturn(new HashMap<String, Object>() {
{
@ -374,6 +382,20 @@ public class ReactiveElasticsearchTemplateCallbackTests {
assertThat(results.get(1).getContent().firstname).isEqualTo("after-convert");
}
@Test // DATAES-796
void searchForPageShouldInvokeAfterConvertCallbacks() {
template.setEntityCallbacks(ReactiveEntityCallbacks.create(afterConvertCallback));
SearchPage<Person> searchPage = template.searchForPage(pagedQueryForTwo(), Person.class)
.timeout(Duration.ofSeconds(1)).block();
verify(afterConvertCallback, times(2)).onAfterConvert(eq(new Person("init", "luke")), eq(lukeDocument()), any());
SearchHits<Person> searchHits = searchPage.getSearchHits();
assertThat(searchHits.getSearchHit(0).getContent().firstname).isEqualTo("after-convert");
assertThat(searchHits.getSearchHit(1).getContent().firstname).isEqualTo("after-convert");
}
@Test // DATAES-772
void searchWithIndexCoordinatesShouldInvokeAfterConvertCallbacks() {

View File

@ -32,6 +32,7 @@ import java.lang.Boolean;
import java.lang.Long;
import java.lang.Object;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -49,6 +50,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@ -84,7 +86,7 @@ import org.springframework.util.StringUtils;
* @author Roman Puchkovskiy
*/
@SpringIntegrationTest
public class ReactiveElasticsearchTemplateTests {
public class ReactiveElasticsearchTemplateIntegrationTests {
@Configuration
@Import({ ReactiveElasticsearchRestTemplateConfiguration.class })
@ -96,6 +98,7 @@ public class ReactiveElasticsearchTemplateTests {
@Autowired private ReactiveElasticsearchTemplate template;
private ReactiveIndexOperations indexOperations;
// region Setup
@BeforeEach
public void setUp() {
indexOperations = template.indexOps(SampleEntity.class);
@ -122,7 +125,9 @@ public class ReactiveElasticsearchTemplateTests {
template.indexOps(IndexCoordinates.of("test-index-reactive-optimistic-and-versioned-entity-template")).delete()
.block();
}
// endregion
// region Tests
@Test // DATAES-504
public void executeShouldProvideResource() {
@ -1010,25 +1015,31 @@ public class ReactiveElasticsearchTemplateTests {
assertThatSeqNoPrimaryTermIsFilled(saved);
}
@Data
@Document(indexName = "marvel")
static class Person {
private @Id String id;
private String name;
private int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
@Test // DATAES-796
@DisplayName("should return Mono of SearchPage")
void shouldReturnMonoOfSearchPage() {
List<SampleEntity> entities = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
entities.add(randomEntity("message " + i));
}
Query query = Query.findAll().setPageable(PageRequest.of(0, 5));
template.saveAll(Mono.just(entities), SampleEntity.class).then(indexOperations.refresh()).block();
Mono<SearchPage<SampleEntity>> searchPageMono = template.searchForPage(query, SampleEntity.class);
searchPageMono.as(StepVerifier::create) //
.consumeNextWith(searchPage -> {
assertThat(searchPage.hasNext()).isTrue();
SearchHits<SampleEntity> searchHits = searchPage.getSearchHits();
assertThat(searchHits.getTotalHits()).isEqualTo(10);
assertThat(searchHits.getSearchHits().size()).isEqualTo(5);
}).verifyComplete();
}
// endregion
// --> JUST some helpers
// region Helper functions
private SampleEntity randomEntity(String message) {
return SampleEntity.builder() //
@ -1057,11 +1068,31 @@ public class ReactiveElasticsearchTemplateTests {
template.saveAll(Mono.just(Arrays.asList(entities)), indexCoordinates).then(indexOperations.refresh()).block();
}
}
// endregion
// region Entities
@Data
@Document(indexName = "marvel")
static class Person {
private @Id String id;
private String name;
private int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class Message {
String message;
}
@ -1103,4 +1134,5 @@ public class ReactiveElasticsearchTemplateTests {
@Id private String id;
@Version private Long version;
}
// endregion
}