diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java index 60a5d8311..538247420 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java @@ -20,6 +20,7 @@ import reactor.core.publisher.Mono; import org.elasticsearch.index.query.QueryBuilders; import org.reactivestreams.Publisher; +import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.StringQuery; @@ -205,7 +206,10 @@ public interface ReactiveElasticsearchOperations { Mono exists(String id, Class entityType, @Nullable String index, @Nullable String type); /** - * Search the index for entities matching the given {@link Query query}. + * Search the index for entities matching the given {@link Query query}.
+ * {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either + * delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) + * size}. * * @param query must not be {@literal null}. * @param entityType must not be {@literal null}. @@ -217,7 +221,10 @@ public interface ReactiveElasticsearchOperations { } /** - * Search the index for entities matching the given {@link Query query}. + * Search the index for entities matching the given {@link Query query}.
+ * {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either * + * delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) * + * size}. * * @param query must not be {@literal null}. * @param entityType The entity type for mapping the query. Must not be {@literal null}. diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index c266f1e8f..0e0d0e0cc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -253,6 +253,16 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); } + sort(query, entity).forEach(searchSourceBuilder::sort); + + if (query.getMinScore() > 0) { + searchSourceBuilder.minScore(query.getMinScore()); + } + + if (query.getIndicesOptions() != null) { + request.indicesOptions(query.getIndicesOptions()); + } + if (query.getPageable().isPaged()) { long offset = query.getPageable().getOffset(); @@ -262,25 +272,15 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera searchSourceBuilder.from((int) offset); searchSourceBuilder.size(query.getPageable().getPageSize()); + + request.source(searchSourceBuilder); + return doFind(prepareSearchRequest(request)); + } else { - searchSourceBuilder.from(0); - searchSourceBuilder.size(10000); // this is the index.max_result_window default value + request.source(searchSourceBuilder); + return doScan(prepareSearchRequest(request)); } - - if (query.getIndicesOptions() != null) { - request.indicesOptions(query.getIndicesOptions()); - } - - sort(query, entity).forEach(searchSourceBuilder::sort); - - if (query.getMinScore() > 0) { - searchSourceBuilder.minScore(query.getMinScore()); - } - - request.source(searchSourceBuilder); - - return doFind(prepareSearchRequest(request)); }); } @@ -516,6 +516,21 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return Flux.from(execute(client -> client.search(request))); } + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link SearchRequest} ready to be executed. + * @return a {@link Flux} emitting the result of the operation. + */ + protected Flux doScan(SearchRequest request) { + + if (QUERY_LOGGER.isDebugEnabled()) { + QUERY_LOGGER.debug("Executing doScan: {}", request); + } + + return Flux.from(execute(client -> client.scroll(request))); + } + /** * Customization hook on the actual execution result {@link Publisher}.
* diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 592381e02..580bf8a7d 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.Before; import org.junit.Rule; @@ -39,6 +40,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.annotation.Id; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.ElasticsearchVersion; import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import org.springframework.data.elasticsearch.TestUtils; @@ -354,6 +358,40 @@ public class ReactiveElasticsearchTemplateTests { .verifyComplete(); } + @Test // DATAES-518 + public void findShouldApplyPagingCorrectly() { + + List source = IntStream.range(0, 100).mapToObj(it -> randomEntity("entity - " + it)) + .collect(Collectors.toList()); + + index(source.toArray(new SampleEntity[0])); + + CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("entity")) // + .addSort(Sort.by("message"))// + .setPageable(PageRequest.of(0, 20)); + + template.find(query, SampleEntity.class).as(StepVerifier::create) // + .expectNextCount(20) // + .verifyComplete(); + } + + @Test // DATAES-518 + public void findWithoutPagingShouldReadAll() { + + List source = IntStream.range(0, 100).mapToObj(it -> randomEntity("entity - " + it)) + .collect(Collectors.toList()); + + index(source.toArray(new SampleEntity[0])); + + CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("entity")) // + .addSort(Sort.by("message"))// + .setPageable(Pageable.unpaged()); + + template.find(query, SampleEntity.class).as(StepVerifier::create) // + .expectNextCount(100) // + .verifyComplete(); + } + @Test // DATAES-504 public void countShouldReturnCountAllWhenGivenNoQuery() { diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java index 95e21f879..86ba07a47 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java @@ -92,20 +92,20 @@ public class ReactiveElasticsearchTemplateUnitTests { assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL); } - @Test // DATAES-504 + @Test // DATAES-504, DATAES-518 public void findShouldFallBackToDefaultIndexOptionsIfNotSet() { ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); when(client.search(captor.capture())).thenReturn(Flux.empty()); - template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) // + template.find(new CriteriaQuery(new Criteria("*")).setPageable(PageRequest.of(0, 10)), SampleEntity.class) // .as(StepVerifier::create) // .verifyComplete(); assertThat(captor.getValue().indicesOptions()).isEqualTo(DEFAULT_INDICES_OPTIONS); } - @Test // DATAES-504 + @Test // DATAES-504, DATAES-518 public void findShouldApplyIndexOptionsIfSet() { ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); @@ -113,7 +113,7 @@ public class ReactiveElasticsearchTemplateUnitTests { template.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) // + template.find(new CriteriaQuery(new Criteria("*")).setPageable(PageRequest.of(0, 10)), SampleEntity.class) // .as(StepVerifier::create) // .verifyComplete(); @@ -135,19 +135,18 @@ public class ReactiveElasticsearchTemplateUnitTests { assertThat(captor.getValue().source().size()).isEqualTo(50); } - @Test // DATAES-504 - public void findShouldApplyDefaultMaxIfPaginationNotSet() { + @Test // DATAES-504, DATAES-518 + public void findShouldUseScrollIfPaginationNotSet() { ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); - when(client.search(captor.capture())).thenReturn(Flux.empty()); + when(client.scroll(captor.capture())).thenReturn(Flux.empty()); template.find(new CriteriaQuery(new Criteria("*")).setPageable(Pageable.unpaged()), SampleEntity.class) // .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().source().from()).isEqualTo(0); - assertThat(captor.getValue().source().size()).isEqualTo(10000); + verify(client).scroll(any()); } @Test // DATAES-504