diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index 382120ad9..fff57c289 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -1,7 +1,6 @@ package org.springframework.data.elasticsearch.core; -import org.elasticsearch.action.bulk.BulkResponse; import org.springframework.data.domain.Page; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.query.*; @@ -91,6 +90,13 @@ public interface ElasticsearchOperations { */ Page queryForPage(StringQuery query, Class clazz); + /** + * Execute the query against elasticsearch and return ids + * + * @param query + * @return + */ + List queryForIds(SearchQuery query); /** * return number of elements found by for given query @@ -156,4 +162,24 @@ public interface ElasticsearchOperations { */ void refresh(Class clazz,boolean waitForOperation); + /** + * Returns scroll id for scan query + * @param query + * @param scrollTimeInMillis + * @param noFields + * @return + */ + String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields); + + /** + * Scrolls the results for give scroll id + * @param scrollId + * @param scrollTimeInMillis + * @param resultsMapper + * @param + * @return + */ + Page scroll(String scrollId, long scrollTimeInMillis, ResultsMapper resultsMapper); + + } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 58860167f..91a0a48d5 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; @@ -38,6 +39,7 @@ import java.util.Map; import static org.apache.commons.lang.StringUtils.isBlank; import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; +import static org.elasticsearch.action.search.SearchType.SCAN; import static org.elasticsearch.client.Requests.indicesExistsRequest; import static org.elasticsearch.client.Requests.refreshRequest; import static org.elasticsearch.index.VersionType.EXTERNAL; @@ -108,6 +110,17 @@ public class ElasticsearchTemplate implements ElasticsearchOperations { return resultsMapper.mapResults(response); } + @Override + public List queryForIds(SearchQuery query) { + SearchRequestBuilder request = prepareSearch(query).setQuery(query.getElasticsearchQuery()) + .setNoFields(); + if(query.getElasticsearchFilter() != null){ + request.setFilter(query.getElasticsearchFilter()); + } + SearchResponse response = request.execute().actionGet(); + return extractIds(response); + } + private SearchResponse doSearch(SearchRequestBuilder searchRequest, QueryBuilder query, FilterBuilder filter ){ if(filter != null){ searchRequest.setFilter(filter); @@ -214,6 +227,9 @@ public class ElasticsearchTemplate implements ElasticsearchOperations { } private SearchRequestBuilder prepareSearch(Query query){ + Assert.notNull(query.getIndices(), "No index defined for Query"); + Assert.notNull(query.getTypes(), "No type define for Query"); + int startRecord = 0; SearchRequestBuilder searchRequestBuilder = client.prepareSearch(toArray(query.getIndices())) .setSearchType(DFS_QUERY_THEN_FETCH) @@ -268,6 +284,38 @@ public class ElasticsearchTemplate implements ElasticsearchOperations { .refresh(refreshRequest(persistentEntity.getIndexName()).waitForOperations(waitForOperation)).actionGet(); } + @Override + public String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields) { + Assert.notNull(query.getIndices(), "No index defined for Query"); + Assert.notNull(query.getTypes(), "No type define for Query"); + Assert.notNull(query.getPageable(), "Query.pageable is required for scan & scroll"); + + SearchRequestBuilder requestBuilder = client.prepareSearch(toArray(query.getIndices())) + .setSearchType(SCAN) + .setQuery(query.getElasticsearchQuery()) + .setTypes(toArray(query.getTypes())) + .setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)) + .setFrom(0) + .setSize(query.getPageable().getPageSize()); + + if(query.getElasticsearchFilter() != null){ + requestBuilder.setFilter(query.getElasticsearchFilter()); + } + + if(noFields){ + requestBuilder.setNoFields(); + } + return requestBuilder.execute().actionGet().getScrollId(); + } + + @Override + public Page scroll(String scrollId, long scrollTimeInMillis, ResultsMapper resultsMapper) { + SearchResponse response = client.prepareSearchScroll(scrollId) + .setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)) + .execute().actionGet(); + return resultsMapper.mapResults(response); + } + private ElasticsearchPersistentEntity getPersistentEntityFor(Class clazz){ Assert.isTrue(clazz.isAnnotationPresent(Document.class), "Unable to identify index name. " + clazz.getSimpleName() + " is not a Document. Make sure the document class is annotated with @Document(indexName=\"foo\")"); @@ -299,6 +347,16 @@ public class ElasticsearchTemplate implements ElasticsearchOperations { return resultsMapper.mapResults(response); } + private List extractIds(SearchResponse response){ + List ids = new ArrayList(); + for (SearchHit hit : response.getHits()) { + if (hit != null) { + ids.add(hit.getId()); + } + } + return ids; + } + private T mapResult(String source, Class clazz){ if(isBlank(source)){ return null;