diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index ef70d0e4c..8a86da9db 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.springframework.data.domain.Page; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.util.CloseableIterator; import java.util.LinkedList; import java.util.List; @@ -209,6 +210,46 @@ public interface ElasticsearchOperations { */ FacetedPage queryForPage(StringQuery query, Class clazz, SearchResultMapper mapper); + /** + * Executes the given {@link CriteriaQuery} against elasticsearch and return result as {@link CloseableIterator}. + *

+ * Returns a {@link CloseableIterator} that wraps an Elasticsearch scroll context that needs to be closed in case of error. + * + * @param element return type + * @param query + * @param clazz + * @return + * @since 1.3 + */ + CloseableIterator stream(CriteriaQuery query, Class clazz); + + /** + * Executes the given {@link SearchQuery} against elasticsearch and return result as {@link CloseableIterator}. + *

+ * Returns a {@link CloseableIterator} that wraps an Elasticsearch scroll context that needs to be closed in case of error. + * + * @param element return type + * @param query + * @param clazz + * @return + * @since 1.3 + */ + CloseableIterator stream(SearchQuery query, Class clazz); + + /** + * Executes the given {@link SearchQuery} against elasticsearch and return result as {@link CloseableIterator} using custom mapper. + *

+ * Returns a {@link CloseableIterator} that wraps an Elasticsearch scroll context that needs to be closed in case of error. + * + * @param element return type + * @param query + * @param clazz + * @param mapper + * @return + * @since 1.3 + */ + CloseableIterator stream(SearchQuery query, Class clazz, SearchResultMapper mapper); + /** * Execute the criteria query against elasticsearch and return result as {@link List} * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 3fe89dc70..b3db7f126 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -78,6 +78,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.mapping.PersistentProperty; +import org.springframework.data.util.CloseableIterator; import org.springframework.util.Assert; import java.io.BufferedReader; @@ -330,6 +331,80 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati return mapper.mapResults(response, clazz, query.getPageable()); } + @Override + public CloseableIterator stream(CriteriaQuery query, Class clazz) { + final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis(); + final String initScrollId = scan(query, scrollTimeInMillis, false); + return doStream(initScrollId, scrollTimeInMillis, clazz, resultsMapper); + } + + @Override + public CloseableIterator stream(SearchQuery query, Class clazz) { + return stream(query, clazz, resultsMapper); + } + + @Override + public CloseableIterator stream(SearchQuery query, final Class clazz, final SearchResultMapper mapper) { + final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis(); + final String initScrollId = scan(query, scrollTimeInMillis, false); + return doStream(initScrollId, scrollTimeInMillis, clazz, mapper); + } + + private CloseableIterator doStream(final String initScrollId, final long scrollTimeInMillis, final Class clazz, final SearchResultMapper mapper) { + return new CloseableIterator() { + + /** As we couldn't retrieve single result with scroll, store current hits. */ + private volatile Iterator currentHits; + + /** The scroll id. */ + private volatile String scrollId = initScrollId; + + /** If stream is finished (ie: cluster returns no results. */ + private volatile boolean finished; + + @Override + public void close() { + try { + // Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done) + if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) { + client.prepareClearScroll().addScrollId(scrollId).execute().actionGet(); + } + } finally { + currentHits = null; + scrollId = null; + } + } + + @Override + public boolean hasNext() { + // Test if stream is finished + if (finished) { + return false; + } + // Test if it remains hits + if (currentHits == null || !currentHits.hasNext()) { + // Do a new request + SearchResponse response = getSearchResponse(client.prepareSearchScroll(scrollId) + .setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).execute()); + // Save hits and scroll id + currentHits = mapper.mapResults(response, clazz, null).iterator(); + finished = !currentHits.hasNext(); + scrollId = response.getScrollId(); + } + return currentHits.hasNext(); + } + + @Override + public T next() { + if (hasNext()) { + return currentHits.next(); + } + throw new NoSuchElementException(); + } + + }; + } + @Override public long count(CriteriaQuery criteriaQuery, Class clazz) { QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria()); diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java index 60465751c..cbc98eccb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java @@ -24,6 +24,8 @@ import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.repository.query.ParametersParameterAccessor; import org.springframework.data.repository.query.parser.PartTree; import org.springframework.util.ClassUtils; +import org.springframework.data.util.CloseableIterator; +import org.springframework.data.util.StreamUtils; /** * ElasticsearchPartQuery @@ -54,6 +56,14 @@ public class ElasticsearchPartQuery extends AbstractElasticsearchRepositoryQuery } else if (queryMethod.isPageQuery()) { query.setPageable(accessor.getPageable()); return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType()); + } else if (queryMethod.isStreamQuery()) { + Class entityType = queryMethod.getEntityInformation().getJavaType(); + if (query.getPageable() == null) { + query.setPageable(new PageRequest(0, 20)); + } + + return StreamUtils.createStreamFromIterator((CloseableIterator) elasticsearchOperations.stream(query, entityType)); + } else if (queryMethod.isCollectionQuery()) { if (accessor.getPageable() == null) { int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType()); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 535e1ddfc..3c5f7439c 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -42,6 +42,7 @@ import org.springframework.data.elasticsearch.entities.HetroEntity1; import org.springframework.data.elasticsearch.entities.HetroEntity2; import org.springframework.data.elasticsearch.entities.SampleEntity; import org.springframework.data.elasticsearch.entities.SampleMappingEntity; +import org.springframework.data.util.CloseableIterator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -896,6 +897,31 @@ public class ElasticsearchTemplateTests { assertThat(sampleEntities.size(), is(equalTo(30))); } + /* + DATAES-167 + */ + @Test + public void shouldReturnResultsWithStreamForGivenCriteriaQuery() { + //given + List entities = createSampleEntitiesWithMessage("Test message", 30); + // when + elasticsearchTemplate.bulkIndex(entities); + elasticsearchTemplate.refresh(SampleEntity.class, true); + // then + + CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria()); + criteriaQuery.addIndices(INDEX_NAME); + criteriaQuery.addTypes(TYPE_NAME); + criteriaQuery.setPageable(new PageRequest(0, 10)); + + CloseableIterator stream = elasticsearchTemplate.stream(criteriaQuery, SampleEntity.class); + List sampleEntities = new ArrayList(); + while (stream.hasNext()) { + sampleEntities.add(stream.next()); + } + assertThat(sampleEntities.size(), is(equalTo(30))); + } + private static List createSampleEntitiesWithMessage(String message, int numberOfEntities) { List indexQueries = new ArrayList(); for (int i = 0; i < numberOfEntities; i++) { diff --git a/src/test/java/org/springframework/data/elasticsearch/repositories/CustomMethodRepositoryTests.java b/src/test/java/org/springframework/data/elasticsearch/repositories/CustomMethodRepositoryTests.java index 432dc176a..0ab5f9fc6 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repositories/CustomMethodRepositoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repositories/CustomMethodRepositoryTests.java @@ -19,8 +19,10 @@ import static org.apache.commons.lang.RandomStringUtils.*; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Stream; import org.junit.Before; import org.junit.Test; @@ -641,6 +643,22 @@ public class CustomMethodRepositoryTests { assertThat(page.getTotalElements(), is(equalTo(1L))); } + /* + DATAES-165 + */ + @Test + public void shouldAllowReturningJava8StreamInCustomQuery() { + // given + List entities = createSampleEntities("abc", 30); + repository.save(entities); + + // when + Stream stream = repository.findByType("abc"); + // then + assertThat(stream, is(notNullValue())); + assertThat(stream.count(), is(equalTo(30L))); + } + /* DATAES-106 */ @@ -1174,5 +1192,18 @@ public class CustomMethodRepositoryTests { // then assertThat(count, is(equalTo(1L))); } + + private List createSampleEntities(String type, int numberOfEntities) { + List entities = new ArrayList(); + for (int i = 0; i < numberOfEntities; i++) { + SampleEntity entity = new SampleEntity(); + entity.setId(randomNumeric(numberOfEntities)); + entity.setAvailable(true); + entity.setMessage("Message"); + entity.setType(type); + entities.add(entity); + } + return entities; + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/repositories/custom/SampleCustomMethodRepository.java b/src/test/java/org/springframework/data/elasticsearch/repositories/custom/SampleCustomMethodRepository.java index 2b2a0e8d4..84a5d2a63 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repositories/custom/SampleCustomMethodRepository.java +++ b/src/test/java/org/springframework/data/elasticsearch/repositories/custom/SampleCustomMethodRepository.java @@ -16,6 +16,7 @@ package org.springframework.data.elasticsearch.repositories.custom; import java.util.List; +import java.util.stream.Stream; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -87,6 +88,8 @@ public interface SampleCustomMethodRepository extends ElasticsearchRepository findByLocationNear(GeoPoint point, String distance, Pageable pageable); + Stream findByType(String type); + long countByType(String type); long countByTypeNot(String type);