From 216fc89fd2f25778054e8eed65dc56a60ba48feb Mon Sep 17 00:00:00 2001 From: rasmusfaber Date: Wed, 10 Jul 2019 22:42:14 +0200 Subject: [PATCH] DATAES-605 - Make batch size for streamQuery configurable. Original PR: #292 --- .../query/ElasticsearchPartQuery.java | 22 ++++----- .../CustomMethodRepositoryBaseTests.java | 45 ++++++++++++++++--- 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java index ee897fdb4..a0edff13c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java @@ -16,7 +16,6 @@ package org.springframework.data.elasticsearch.repository.query; import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; @@ -24,9 +23,9 @@ import org.springframework.data.elasticsearch.repository.query.parser.Elasticsea import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.repository.query.ParametersParameterAccessor; import org.springframework.data.repository.query.parser.PartTree; -import org.springframework.util.ClassUtils; import org.springframework.data.util.CloseableIterator; import org.springframework.data.util.StreamUtils; +import org.springframework.util.ClassUtils; /** * ElasticsearchPartQuery @@ -35,9 +34,10 @@ import org.springframework.data.util.StreamUtils; * @author Mohsin Husen * @author Kevin Leturc * @author Mark Paluch + * @author Rasmus Faber-Espensen */ public class ElasticsearchPartQuery extends AbstractElasticsearchRepositoryQuery { - + private static final int DEFAULT_STREAM_BATCH_SIZE = 500; private final PartTree tree; private final MappingContext mappingContext; @@ -51,7 +51,7 @@ public class ElasticsearchPartQuery extends AbstractElasticsearchRepositoryQuery public Object execute(Object[] parameters) { ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters); CriteriaQuery query = createQuery(accessor); - if(tree.isDelete()) { + if (tree.isDelete()) { Object result = countOrGetDocumentsForDelete(query, accessor); elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType()); return result; @@ -60,20 +60,22 @@ public class ElasticsearchPartQuery extends AbstractElasticsearchRepositoryQuery return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType()); } else if (queryMethod.isStreamQuery()) { Class entityType = queryMethod.getEntityInformation().getJavaType(); - if (query.getPageable().isUnpaged()) { - int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType()); - query.setPageable(PageRequest.of(0, Math.max(1, itemCount))); + if (accessor.getPageable().isUnpaged()) { + query.setPageable(PageRequest.of(0, DEFAULT_STREAM_BATCH_SIZE)); + } else { + query.setPageable(accessor.getPageable()); } - return StreamUtils.createStreamFromIterator((CloseableIterator) elasticsearchOperations.stream(query, entityType)); + return StreamUtils + .createStreamFromIterator((CloseableIterator) elasticsearchOperations.stream(query, entityType)); } else if (queryMethod.isCollectionQuery()) { if (accessor.getPageable().isUnpaged()) { int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType()); query.setPageable(PageRequest.of(0, Math.max(1, itemCount))); } else { - query.setPageable(accessor.getPageable()); - } + query.setPageable(accessor.getPageable()); + } return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType()); } else if (tree.isCountProjection()) { return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType()); diff --git a/src/test/java/org/springframework/data/elasticsearch/repositories/custommethod/CustomMethodRepositoryBaseTests.java b/src/test/java/org/springframework/data/elasticsearch/repositories/custommethod/CustomMethodRepositoryBaseTests.java index 2ab23534a..5f1f686c8 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repositories/custommethod/CustomMethodRepositoryBaseTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repositories/custommethod/CustomMethodRepositoryBaseTests.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.stream.Stream; import org.junit.Test; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Version; @@ -65,6 +64,8 @@ public abstract class CustomMethodRepositoryBaseTests { @Autowired private SampleCustomMethodRepository repository; + @Autowired private SampleStreamingCustomMethodRepository streamingRepository; + @Test public void shouldExecuteCustomMethod() { @@ -711,7 +712,7 @@ public abstract class CustomMethodRepositoryBaseTests { repository.saveAll(entities); // when - Stream stream = repository.findByType("abc"); + Stream stream = streamingRepository.findByType("abc"); // then assertThat(stream).isNotNull(); @@ -1236,13 +1237,41 @@ public abstract class CustomMethodRepositoryBaseTests { assertThat(count).isEqualTo(1L); } + @Test // DATAES-605 + public void streamMethodsShouldWorkWithLargeResultSets() { + // given + List entities = createSampleEntities("abc", 10001); + repository.saveAll(entities); + + // when + Stream stream = streamingRepository.findByType("abc"); + + // then + assertThat(stream).isNotNull(); + assertThat(stream.count()).isEqualTo(10001L); + } + + @Test // DATAES-605 + public void streamMethodsCanHandlePageable() { + // given + List entities = createSampleEntities("abc", 10); + repository.saveAll(entities); + + // when + Stream stream = streamingRepository.findByType("abc", PageRequest.of(0, 2)); + + // then + assertThat(stream).isNotNull(); + assertThat(stream.count()).isEqualTo(10L); + } + private List createSampleEntities(String type, int numberOfEntities) { List entities = new ArrayList<>(); for (int i = 0; i < numberOfEntities; i++) { SampleEntity entity = new SampleEntity(); - entity.setId(randomNumeric(numberOfEntities)); + entity.setId(randomNumeric(32)); entity.setAvailable(true); entity.setMessage("Message"); entity.setType(type); @@ -1328,8 +1357,6 @@ public abstract class CustomMethodRepositoryBaseTests { Page findByLocationNear(GeoPoint point, String distance, Pageable pageable); - Stream findByType(String type); - long countByType(String type); long countByTypeNot(String type); @@ -1371,4 +1398,12 @@ public abstract class CustomMethodRepositoryBaseTests { long countByLocationNear(GeoPoint point, String distance); } + /** + * @author Rasmus Faber-Espensen + */ + public interface SampleStreamingCustomMethodRepository extends ElasticsearchRepository { + Stream findByType(String type); + + Stream findByType(String type, Pageable pageable); + } }