1) added support for queryForIds

2) added support for scan and scroll
This commit is contained in:
Rizwan Idrees 2013-02-21 14:02:04 +00:00
parent 0bc8ae54fe
commit e54b263be9
2 changed files with 85 additions and 1 deletions

View File

@ -1,7 +1,6 @@
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.action.bulk.BulkResponse;
import org.springframework.data.domain.Page;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.query.*;
@ -91,6 +90,13 @@ public interface ElasticsearchOperations {
*/
<T> Page<T> queryForPage(StringQuery query, Class<T> clazz);
/**
* Execute the query against elasticsearch and return ids
*
* @param query
* @return
*/
<T> List<String> queryForIds(SearchQuery query);
/**
* return number of elements found by for given query
@ -156,4 +162,24 @@ public interface ElasticsearchOperations {
*/
<T> void refresh(Class<T> clazz,boolean waitForOperation);
/**
* Returns scroll id for scan query
* @param query
* @param scrollTimeInMillis
* @param noFields
* @return
*/
String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields);
/**
* Scrolls the results for give scroll id
* @param scrollId
* @param scrollTimeInMillis
* @param resultsMapper
* @param <T>
* @return
*/
<T> Page<T> scroll(String scrollId, long scrollTimeInMillis, ResultsMapper<T> resultsMapper);
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
@ -38,6 +39,7 @@ import java.util.Map;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.elasticsearch.action.search.SearchType.SCAN;
import static org.elasticsearch.client.Requests.indicesExistsRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.index.VersionType.EXTERNAL;
@ -108,6 +110,17 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
return resultsMapper.mapResults(response);
}
@Override
public <T> List<String> queryForIds(SearchQuery query) {
SearchRequestBuilder request = prepareSearch(query).setQuery(query.getElasticsearchQuery())
.setNoFields();
if(query.getElasticsearchFilter() != null){
request.setFilter(query.getElasticsearchFilter());
}
SearchResponse response = request.execute().actionGet();
return extractIds(response);
}
private SearchResponse doSearch(SearchRequestBuilder searchRequest, QueryBuilder query, FilterBuilder filter ){
if(filter != null){
searchRequest.setFilter(filter);
@ -214,6 +227,9 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
}
private SearchRequestBuilder prepareSearch(Query query){
Assert.notNull(query.getIndices(), "No index defined for Query");
Assert.notNull(query.getTypes(), "No type define for Query");
int startRecord = 0;
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(toArray(query.getIndices()))
.setSearchType(DFS_QUERY_THEN_FETCH)
@ -268,6 +284,38 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
.refresh(refreshRequest(persistentEntity.getIndexName()).waitForOperations(waitForOperation)).actionGet();
}
@Override
public String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields) {
Assert.notNull(query.getIndices(), "No index defined for Query");
Assert.notNull(query.getTypes(), "No type define for Query");
Assert.notNull(query.getPageable(), "Query.pageable is required for scan & scroll");
SearchRequestBuilder requestBuilder = client.prepareSearch(toArray(query.getIndices()))
.setSearchType(SCAN)
.setQuery(query.getElasticsearchQuery())
.setTypes(toArray(query.getTypes()))
.setScroll(TimeValue.timeValueMillis(scrollTimeInMillis))
.setFrom(0)
.setSize(query.getPageable().getPageSize());
if(query.getElasticsearchFilter() != null){
requestBuilder.setFilter(query.getElasticsearchFilter());
}
if(noFields){
requestBuilder.setNoFields();
}
return requestBuilder.execute().actionGet().getScrollId();
}
@Override
public <T> Page<T> scroll(String scrollId, long scrollTimeInMillis, ResultsMapper<T> resultsMapper) {
SearchResponse response = client.prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMillis(scrollTimeInMillis))
.execute().actionGet();
return resultsMapper.mapResults(response);
}
private ElasticsearchPersistentEntity getPersistentEntityFor(Class clazz){
Assert.isTrue(clazz.isAnnotationPresent(Document.class), "Unable to identify index name. " +
clazz.getSimpleName() + " is not a Document. Make sure the document class is annotated with @Document(indexName=\"foo\")");
@ -299,6 +347,16 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
return resultsMapper.mapResults(response);
}
private List<String> extractIds(SearchResponse response){
List<String> ids = new ArrayList<String>();
for (SearchHit hit : response.getHits()) {
if (hit != null) {
ids.add(hit.getId());
}
}
return ids;
}
private <T> T mapResult(String source, Class<T> clazz){
if(isBlank(source)){
return null;