DATAES-39 : SearchResultMapper support for scroll and queryForPage

This commit is contained in:
mohsin.husen 2013-11-23 13:45:42 +00:00
parent f824f4f1e9
commit a833c89a69
3 changed files with 62 additions and 0 deletions

View File

@ -310,6 +310,17 @@ public interface ElasticsearchOperations {
*/
<T> Page<T> scroll(String scrollId, long scrollTimeInMillis, Class<T> clazz);
/**
* Scrolls the results for give scroll id using custom result mapper
*
* @param scrollId
* @param scrollTimeInMillis
* @param mapper
* @param <T>
* @return
*/
<T> Page<T> scroll(String scrollId, long scrollTimeInMillis, SearchResultMapper mapper);
/**
* more like this query to search for documents that are "like" a specific document.
*

View File

@ -367,6 +367,13 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
return resultsMapper.mapResults(response, clazz, null);
}
@Override
public <T> Page<T> scroll(String scrollId, long scrollTimeInMillis, SearchResultMapper mapper) {
SearchResponse response = client.prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).execute().actionGet();
return mapper.mapResults(response, null, null);
}
@Override
public <T> Page<T> moreLikeThis(MoreLikeThisQuery query, Class<T> clazz) {
int startRecord = 0;

View File

@ -556,6 +556,50 @@ public class ElasticsearchTemplateTests {
assertThat(sampleEntities.size(), is(equalTo(30)));
}
@Test
public void shouldReturnResultsForScanAndScrollWithCustomResultMapper() {
//given
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
// when
elasticsearchTemplate.bulkIndex(entities);
elasticsearchTemplate.refresh(SampleEntity.class, true);
// then
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withIndices("test-index")
.withTypes("test-type").withPageable(new PageRequest(0, 10)).build();
String scrollId = elasticsearchTemplate.scan(searchQuery, 1000, false);
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
boolean hasRecords = true;
while (hasRecords) {
Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L, new SearchResultMapper() {
@Override
public <T> FacetedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
List<SampleEntity> chunk = new ArrayList<SampleEntity>();
for (SearchHit searchHit : response.getHits()) {
if (response.getHits().getHits().length <= 0) {
return null;
}
SampleEntity user = new SampleEntity();
user.setId(searchHit.getId());
user.setMessage((String) searchHit.getSource().get("message"));
chunk.add(user);
}
if (chunk.size() > 0) {
return new FacetedPageImpl<T>((List<T>) chunk);
}
return null;
}
});
if (page != null) {
sampleEntities.addAll(page.getContent());
} else {
hasRecords = false;
}
}
assertThat(sampleEntities.size(), is(equalTo(30)));
}
private static List<IndexQuery> createSampleEntitiesWithMessage(String message, int numberOfEntities) {
List<IndexQuery> indexQueries = new ArrayList<IndexQuery>();
for (int i = 0; i < numberOfEntities; i++) {