DATAES-605 - Make batch size for streamQuery configurable.

Original PR: #292
This commit is contained in:
rasmusfaber 2019-07-10 22:42:14 +02:00 committed by Peter-Josef Meisch
parent 7fd9986c6e
commit 216fc89fd2
2 changed files with 52 additions and 15 deletions

View File

@ -16,7 +16,6 @@
package org.springframework.data.elasticsearch.repository.query;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
@ -24,9 +23,9 @@ import org.springframework.data.elasticsearch.repository.query.parser.Elasticsea
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.repository.query.ParametersParameterAccessor;
import org.springframework.data.repository.query.parser.PartTree;
import org.springframework.util.ClassUtils;
import org.springframework.data.util.CloseableIterator;
import org.springframework.data.util.StreamUtils;
import org.springframework.util.ClassUtils;
/**
* ElasticsearchPartQuery
@ -35,9 +34,10 @@ import org.springframework.data.util.StreamUtils;
* @author Mohsin Husen
* @author Kevin Leturc
* @author Mark Paluch
* @author Rasmus Faber-Espensen
*/
public class ElasticsearchPartQuery extends AbstractElasticsearchRepositoryQuery {
private static final int DEFAULT_STREAM_BATCH_SIZE = 500;
private final PartTree tree;
private final MappingContext<?, ElasticsearchPersistentProperty> mappingContext;
@ -60,12 +60,14 @@ public class ElasticsearchPartQuery extends AbstractElasticsearchRepositoryQuery
return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
} else if (queryMethod.isStreamQuery()) {
Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
if (query.getPageable().isUnpaged()) {
int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
if (accessor.getPageable().isUnpaged()) {
query.setPageable(PageRequest.of(0, DEFAULT_STREAM_BATCH_SIZE));
} else {
query.setPageable(accessor.getPageable());
}
return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
return StreamUtils
.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
} else if (queryMethod.isCollectionQuery()) {
if (accessor.getPageable().isUnpaged()) {

View File

@ -32,7 +32,6 @@ import java.util.List;
import java.util.stream.Stream;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
@ -65,6 +64,8 @@ public abstract class CustomMethodRepositoryBaseTests {
@Autowired private SampleCustomMethodRepository repository;
@Autowired private SampleStreamingCustomMethodRepository streamingRepository;
@Test
public void shouldExecuteCustomMethod() {
@ -711,7 +712,7 @@ public abstract class CustomMethodRepositoryBaseTests {
repository.saveAll(entities);
// when
Stream<SampleEntity> stream = repository.findByType("abc");
Stream<SampleEntity> stream = streamingRepository.findByType("abc");
// then
assertThat(stream).isNotNull();
@ -1236,13 +1237,41 @@ public abstract class CustomMethodRepositoryBaseTests {
assertThat(count).isEqualTo(1L);
}
@Test // DATAES-605
public void streamMethodsShouldWorkWithLargeResultSets() {
// given
List<SampleEntity> entities = createSampleEntities("abc", 10001);
repository.saveAll(entities);
// when
Stream<SampleEntity> stream = streamingRepository.findByType("abc");
// then
assertThat(stream).isNotNull();
assertThat(stream.count()).isEqualTo(10001L);
}
@Test // DATAES-605
public void streamMethodsCanHandlePageable() {
// given
List<SampleEntity> entities = createSampleEntities("abc", 10);
repository.saveAll(entities);
// when
Stream<SampleEntity> stream = streamingRepository.findByType("abc", PageRequest.of(0, 2));
// then
assertThat(stream).isNotNull();
assertThat(stream.count()).isEqualTo(10L);
}
private List<SampleEntity> createSampleEntities(String type, int numberOfEntities) {
List<SampleEntity> entities = new ArrayList<>();
for (int i = 0; i < numberOfEntities; i++) {
SampleEntity entity = new SampleEntity();
entity.setId(randomNumeric(numberOfEntities));
entity.setId(randomNumeric(32));
entity.setAvailable(true);
entity.setMessage("Message");
entity.setType(type);
@ -1328,8 +1357,6 @@ public abstract class CustomMethodRepositoryBaseTests {
Page<SampleEntity> findByLocationNear(GeoPoint point, String distance, Pageable pageable);
Stream<SampleEntity> findByType(String type);
long countByType(String type);
long countByTypeNot(String type);
@ -1371,4 +1398,12 @@ public abstract class CustomMethodRepositoryBaseTests {
long countByLocationNear(GeoPoint point, String distance);
}
/**
* @author Rasmus Faber-Espensen
*/
public interface SampleStreamingCustomMethodRepository extends ElasticsearchRepository<SampleEntity, String> {
Stream<SampleEntity> findByType(String type);
Stream<SampleEntity> findByType(String type, Pageable pageable);
}
}