DATAES-217 Propose scan method with a class in order to retrieve index and type from Document

This commit is contained in:
Kevin Leturc 2015-12-06 20:34:36 +01:00 committed by Artur Konczak
parent 751302d6f0
commit 23e1e9f46a
3 changed files with 147 additions and 39 deletions

View File

@ -484,23 +484,55 @@ public interface ElasticsearchOperations {
/** /**
* Returns scroll id for criteria query * Returns scroll id for criteria query
* *
* @param query * @param query The criteria query.
* @param scrollTimeInMillis * @param scrollTimeInMillis The time in millisecond for scroll feature
* @param noFields * {@link org.elasticsearch.action.search.SearchRequestBuilder#setScroll(org.elasticsearch.common.unit.TimeValue)}.
* @return * @param noFields The no fields support
* {@link org.elasticsearch.action.search.SearchRequestBuilder#setNoFields()}.
* @return The scan id for input query.
*/ */
String scan(CriteriaQuery query, long scrollTimeInMillis, boolean noFields); String scan(CriteriaQuery query, long scrollTimeInMillis, boolean noFields);
/**
* Returns scroll id for criteria query
*
* @param query The criteria query.
* @param scrollTimeInMillis The time in millisecond for scroll feature
* {@link org.elasticsearch.action.search.SearchRequestBuilder#setScroll(org.elasticsearch.common.unit.TimeValue)}.
* @param noFields The no fields support
* {@link org.elasticsearch.action.search.SearchRequestBuilder#setNoFields()}.
* @param clazz The class of entity to retrieve.
* @param <T> The type of entity to retrieve.
* @return The scan id for input query.
*/
<T> String scan(CriteriaQuery query, long scrollTimeInMillis, boolean noFields, Class<T> clazz);
/** /**
* Returns scroll id for scan query * Returns scroll id for scan query
* *
* @param query * @param query The search query.
* @param scrollTimeInMillis * @param scrollTimeInMillis The time in millisecond for scroll feature
* @param noFields * {@link org.elasticsearch.action.search.SearchRequestBuilder#setScroll(org.elasticsearch.common.unit.TimeValue)}.
* @return * @param noFields The no fields support
* {@link org.elasticsearch.action.search.SearchRequestBuilder#setNoFields()}.
* @return The scan id for input query.
*/ */
String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields); String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields);
/**
* Returns scroll id for scan query
*
* @param query The search query.
* @param scrollTimeInMillis The time in millisecond for scroll feature
* {@link org.elasticsearch.action.search.SearchRequestBuilder#setScroll(org.elasticsearch.common.unit.TimeValue)}.
* @param noFields The no fields support
* {@link org.elasticsearch.action.search.SearchRequestBuilder#setNoFields()}.
* @param clazz The class of entity to retrieve.
* @param <T> The type of entity to retrieve.
* @return The scan id for input query.
*/
<T> String scan(SearchQuery query, long scrollTimeInMillis, boolean noFields, Class<T> clazz);
/** /**
* Scrolls the results for give scroll id * Scrolls the results for give scroll id
* *

View File

@ -337,8 +337,7 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
@Override @Override
public <T> CloseableIterator<T> stream(CriteriaQuery query, Class<T> clazz) { public <T> CloseableIterator<T> stream(CriteriaQuery query, Class<T> clazz) {
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis(); final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
setPersistentEntityIndexAndType(query, clazz); final String initScrollId = scan(query, scrollTimeInMillis, false, clazz);
final String initScrollId = scan(query, scrollTimeInMillis, false);
return doStream(initScrollId, scrollTimeInMillis, clazz, resultsMapper); return doStream(initScrollId, scrollTimeInMillis, clazz, resultsMapper);
} }
@ -350,8 +349,7 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
@Override @Override
public <T> CloseableIterator<T> stream(SearchQuery query, final Class<T> clazz, final SearchResultMapper mapper) { public <T> CloseableIterator<T> stream(SearchQuery query, final Class<T> clazz, final SearchResultMapper mapper) {
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis(); final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
setPersistentEntityIndexAndType(query, clazz); final String initScrollId = scan(query, scrollTimeInMillis, false, clazz);
final String initScrollId = scan(query, scrollTimeInMillis, false);
return doStream(initScrollId, scrollTimeInMillis, clazz, mapper); return doStream(initScrollId, scrollTimeInMillis, clazz, mapper);
} }
@ -675,40 +673,27 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
@Override @Override
public String scan(CriteriaQuery criteriaQuery, long scrollTimeInMillis, boolean noFields) { public String scan(CriteriaQuery criteriaQuery, long scrollTimeInMillis, boolean noFields) {
Assert.notNull(criteriaQuery.getIndices(), "No index defined for Query"); return doScan(prepareScan(criteriaQuery, scrollTimeInMillis, noFields), criteriaQuery);
Assert.notNull(criteriaQuery.getTypes(), "No type define for Query");
Assert.notNull(criteriaQuery.getPageable(), "Query.pageable is required for scan & scroll");
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria());
FilterBuilder elasticsearchFilter = new CriteriaFilterProcessor().createFilterFromCriteria(criteriaQuery.getCriteria());
SearchRequestBuilder requestBuilder = prepareScan(criteriaQuery, scrollTimeInMillis, noFields);
if (elasticsearchQuery != null) {
requestBuilder.setQuery(elasticsearchQuery);
} else {
requestBuilder.setQuery(QueryBuilders.matchAllQuery());
} }
if (elasticsearchFilter != null) { @Override
requestBuilder.setPostFilter(elasticsearchFilter); public <T> String scan(CriteriaQuery criteriaQuery, long scrollTimeInMillis, boolean noFields, Class<T> clazz) {
} return doScan(prepareScan(criteriaQuery, scrollTimeInMillis, noFields, clazz), criteriaQuery);
return getSearchResponse(requestBuilder.execute()).getScrollId();
} }
@Override @Override
public String scan(SearchQuery searchQuery, long scrollTimeInMillis, boolean noFields) { public String scan(SearchQuery searchQuery, long scrollTimeInMillis, boolean noFields) {
Assert.notNull(searchQuery.getIndices(), "No index defined for Query"); return doScan(prepareScan(searchQuery, scrollTimeInMillis, noFields), searchQuery);
Assert.notNull(searchQuery.getTypes(), "No type define for Query");
Assert.notNull(searchQuery.getPageable(), "Query.pageable is required for scan & scroll");
SearchRequestBuilder requestBuilder = prepareScan(searchQuery, scrollTimeInMillis, noFields);
if (searchQuery.getFilter() != null) {
requestBuilder.setPostFilter(searchQuery.getFilter());
} }
return getSearchResponse(requestBuilder.setQuery(searchQuery.getQuery()).execute()).getScrollId(); @Override
public <T> String scan(SearchQuery searchQuery, long scrollTimeInMillis, boolean noFields, Class<T> clazz) {
return doScan(prepareScan(searchQuery, scrollTimeInMillis, noFields, clazz), searchQuery);
}
private <T> SearchRequestBuilder prepareScan(Query query, long scrollTimeInMillis, boolean noFields, Class<T> clazz) {
setPersistentEntityIndexAndType(query, clazz);
return prepareScan(query, scrollTimeInMillis, noFields);
} }
private SearchRequestBuilder prepareScan(Query query, long scrollTimeInMillis, boolean noFields) { private SearchRequestBuilder prepareScan(Query query, long scrollTimeInMillis, boolean noFields) {
@ -727,6 +712,39 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
return requestBuilder; return requestBuilder;
} }
private String doScan(SearchRequestBuilder requestBuilder, CriteriaQuery criteriaQuery) {
Assert.notNull(criteriaQuery.getIndices(), "No index defined for Query");
Assert.notNull(criteriaQuery.getTypes(), "No type define for Query");
Assert.notNull(criteriaQuery.getPageable(), "Query.pageable is required for scan & scroll");
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria());
FilterBuilder elasticsearchFilter = new CriteriaFilterProcessor().createFilterFromCriteria(criteriaQuery.getCriteria());
if (elasticsearchQuery != null) {
requestBuilder.setQuery(elasticsearchQuery);
} else {
requestBuilder.setQuery(QueryBuilders.matchAllQuery());
}
if (elasticsearchFilter != null) {
requestBuilder.setPostFilter(elasticsearchFilter);
}
return getSearchResponse(requestBuilder.execute()).getScrollId();
}
private String doScan(SearchRequestBuilder requestBuilder, SearchQuery searchQuery) {
Assert.notNull(searchQuery.getIndices(), "No index defined for Query");
Assert.notNull(searchQuery.getTypes(), "No type define for Query");
Assert.notNull(searchQuery.getPageable(), "Query.pageable is required for scan & scroll");
if (searchQuery.getFilter() != null) {
requestBuilder.setPostFilter(searchQuery.getFilter());
}
return getSearchResponse(requestBuilder.setQuery(searchQuery.getQuery()).execute()).getScrollId();
}
@Override @Override
public <T> Page<T> scroll(String scrollId, long scrollTimeInMillis, Class<T> clazz) { public <T> Page<T> scroll(String scrollId, long scrollTimeInMillis, Class<T> clazz) {
SearchResponse response = getSearchResponse(client.prepareSearchScroll(scrollId) SearchResponse response = getSearchResponse(client.prepareSearchScroll(scrollId)

View File

@ -900,6 +900,64 @@ public class ElasticsearchTemplateTests {
assertThat(sampleEntities.size(), is(equalTo(30))); assertThat(sampleEntities.size(), is(equalTo(30)));
} }
/*
DATAES-217
*/
@Test
public void shouldReturnResultsWithScanAndScrollForGivenCriteriaQueryAndClass() {
//given
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
// when
elasticsearchTemplate.bulkIndex(entities);
elasticsearchTemplate.refresh(SampleEntity.class, true);
// then
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
criteriaQuery.setPageable(new PageRequest(0, 10));
String scrollId = elasticsearchTemplate.scan(criteriaQuery, 1000, false, SampleEntity.class);
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
boolean hasRecords = true;
while (hasRecords) {
Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L, SampleEntity.class);
if (page.hasContent()) {
sampleEntities.addAll(page.getContent());
} else {
hasRecords = false;
}
}
assertThat(sampleEntities.size(), is(equalTo(30)));
}
/*
DATAES-217
*/
@Test
public void shouldReturnResultsWithScanAndScrollForGivenSearchQueryAndClass() {
//given
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
// when
elasticsearchTemplate.bulkIndex(entities);
elasticsearchTemplate.refresh(SampleEntity.class, true);
// then
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withPageable(new PageRequest(0, 10)).build();
String scrollId = elasticsearchTemplate.scan(searchQuery, 1000, false, SampleEntity.class);
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
boolean hasRecords = true;
while (hasRecords) {
Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L, SampleEntity.class);
if (page.hasContent()) {
sampleEntities.addAll(page.getContent());
} else {
hasRecords = false;
}
}
assertThat(sampleEntities.size(), is(equalTo(30)));
}
/* /*
DATAES-167 DATAES-167
*/ */