From 576495932e27c1dac8bef0bc629211d544beed6d Mon Sep 17 00:00:00 2001 From: Kevin Leturc Date: Sun, 31 May 2015 17:58:44 +0200 Subject: [PATCH] DATAES-167 - Add scan method for CriteriaQuery. --- .../core/ElasticsearchOperations.java | 10 ++ .../core/ElasticsearchTemplate.java | 43 +++++- .../core/ElasticsearchTemplateTests.java | 135 +++++++++++++++++- 3 files changed, 178 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index 15e9731e5..ef70d0e4c 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -439,6 +439,16 @@ public interface ElasticsearchOperations { */ void refresh(Class clazz, boolean waitForOperation); + /** + * Returns scroll id for criteria query + * + * @param query + * @param scrollTimeInMillis + * @param noFields + * @return + */ + String scan(CriteriaQuery query, long scrollTimeInMillis, boolean noFields); + /** * Returns scroll id for scan query * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 22c93f1c4..3fe89dc70 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -594,29 +594,58 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati delete(deleteQuery, clazz); } + @Override + public String scan(CriteriaQuery criteriaQuery, long scrollTimeInMillis, boolean noFields) { + Assert.notNull(criteriaQuery.getIndices(), "No index defined for Query"); + Assert.notNull(criteriaQuery.getTypes(), "No type define for Query"); + Assert.notNull(criteriaQuery.getPageable(), "Query.pageable is required for scan & scroll"); + + QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria()); + FilterBuilder elasticsearchFilter = new CriteriaFilterProcessor().createFilterFromCriteria(criteriaQuery.getCriteria()); + SearchRequestBuilder requestBuilder = prepareScan(criteriaQuery, scrollTimeInMillis, noFields); + + if (elasticsearchQuery != null) { + requestBuilder.setQuery(elasticsearchQuery); + } else { + requestBuilder.setQuery(QueryBuilders.matchAllQuery()); + } + + if (elasticsearchFilter != null) { + requestBuilder.setPostFilter(elasticsearchFilter); + } + + return getSearchResponse(requestBuilder.execute()).getScrollId(); + } + @Override public String scan(SearchQuery searchQuery, long scrollTimeInMillis, boolean noFields) { Assert.notNull(searchQuery.getIndices(), "No index defined for Query"); Assert.notNull(searchQuery.getTypes(), "No type define for Query"); Assert.notNull(searchQuery.getPageable(), "Query.pageable is required for scan & scroll"); - SearchRequestBuilder requestBuilder = client.prepareSearch(toArray(searchQuery.getIndices())).setSearchType(SCAN) - .setQuery(searchQuery.getQuery()).setTypes(toArray(searchQuery.getTypes())) - .setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).setFrom(0) - .setSize(searchQuery.getPageable().getPageSize()); + SearchRequestBuilder requestBuilder = prepareScan(searchQuery, scrollTimeInMillis, noFields); if (searchQuery.getFilter() != null) { requestBuilder.setPostFilter(searchQuery.getFilter()); } - if (isNotEmpty(searchQuery.getFields())) { - requestBuilder.addFields(toArray(searchQuery.getFields())); + return getSearchResponse(requestBuilder.setQuery(searchQuery.getQuery()).execute()).getScrollId(); + } + + private SearchRequestBuilder prepareScan(Query query, long scrollTimeInMillis, boolean noFields) { + SearchRequestBuilder requestBuilder = client.prepareSearch(toArray(query.getIndices())).setSearchType(SCAN) + .setTypes(toArray(query.getTypes())) + .setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).setFrom(0) + .setSize(query.getPageable().getPageSize()); + + if (isNotEmpty(query.getFields())) { + requestBuilder.addFields(toArray(query.getFields())); } if (noFields) { requestBuilder.setNoFields(); } - return getSearchResponse(requestBuilder.execute()).getScrollId(); + return requestBuilder; } @Override diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 5b6278baf..535e1ddfc 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -647,8 +647,39 @@ public class ElasticsearchTemplateTests { assertThat(sampleEntities.getContent(), hasItem(sampleEntity)); } + /* + DATAES-167 + */ @Test - public void shouldReturnResultsWithScanAndScroll() { + public void shouldReturnResultsWithScanAndScrollForGivenCriteriaQuery() { + //given + List entities = createSampleEntitiesWithMessage("Test message", 30); + // when + elasticsearchTemplate.bulkIndex(entities); + elasticsearchTemplate.refresh(SampleEntity.class, true); + // then + + CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria()); + criteriaQuery.addIndices(INDEX_NAME); + criteriaQuery.addTypes(TYPE_NAME); + criteriaQuery.setPageable(new PageRequest(0, 10)); + + String scrollId = elasticsearchTemplate.scan(criteriaQuery, 1000, false); + List sampleEntities = new ArrayList(); + boolean hasRecords = true; + while (hasRecords) { + Page page = elasticsearchTemplate.scroll(scrollId, 5000L, SampleEntity.class); + if (page.hasContent()) { + sampleEntities.addAll(page.getContent()); + } else { + hasRecords = false; + } + } + assertThat(sampleEntities.size(), is(equalTo(30))); + } + + @Test + public void shouldReturnResultsWithScanAndScrollForGivenSearchQuery() { //given List entities = createSampleEntitiesWithMessage("Test message", 30); // when @@ -673,11 +704,60 @@ public class ElasticsearchTemplateTests { assertThat(sampleEntities.size(), is(equalTo(30))); } + /* + DATAES-167 + */ + @Test + public void shouldReturnResultsWithScanAndScrollForSpecifiedFieldsForCriteriaCriteria() { + //given + List entities = createSampleEntitiesWithMessage("Test message", 30); + // when + elasticsearchTemplate.bulkIndex(entities); + elasticsearchTemplate.refresh(SampleEntity.class, true); + // then + + CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria()); + criteriaQuery.addIndices(INDEX_NAME); + criteriaQuery.addTypes(TYPE_NAME); + criteriaQuery.addFields("message"); + criteriaQuery.setPageable(new PageRequest(0, 10)); + + String scrollId = elasticsearchTemplate.scan(criteriaQuery, 5000, false); + List sampleEntities = new ArrayList(); + boolean hasRecords = true; + while (hasRecords) { + Page page = elasticsearchTemplate.scroll(scrollId, 5000L, new SearchResultMapper() { + @Override + public FacetedPage mapResults(SearchResponse response, Class clazz, Pageable pageable) { + List result = new ArrayList(); + for (SearchHit searchHit : response.getHits()) { + String message = searchHit.getFields().get("message").getValue(); + SampleEntity sampleEntity = new SampleEntity(); + sampleEntity.setId(searchHit.getId()); + sampleEntity.setMessage(message); + result.add(sampleEntity); + } + + if (result.size() > 0) { + return new FacetedPageImpl((List) result); + } + return null; + } + }); + if (page != null) { + sampleEntities.addAll(page.getContent()); + } else { + hasRecords = false; + } + } + assertThat(sampleEntities.size(), is(equalTo(30))); + } + /* DATAES-84 */ @Test - public void shouldReturnResultsWithScanAndScrollForSpecifiedFields() { + public void shouldReturnResultsWithScanAndScrollForSpecifiedFieldsForSearchCriteria() { //given List entities = createSampleEntitiesWithMessage("Test message", 30); // when @@ -723,8 +803,57 @@ public class ElasticsearchTemplateTests { assertThat(sampleEntities.size(), is(equalTo(30))); } + /* + DATAES-167 + */ @Test - public void shouldReturnResultsForScanAndScrollWithCustomResultMapper() { + public void shouldReturnResultsForScanAndScrollWithCustomResultMapperForGivenCriteriaQuery() { + //given + List entities = createSampleEntitiesWithMessage("Test message", 30); + // when + elasticsearchTemplate.bulkIndex(entities); + elasticsearchTemplate.refresh(SampleEntity.class, true); + // then + + CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria()); + criteriaQuery.addIndices(INDEX_NAME); + criteriaQuery.addTypes(TYPE_NAME); + criteriaQuery.setPageable(new PageRequest(0, 10)); + + String scrollId = elasticsearchTemplate.scan(criteriaQuery, 1000, false); + List sampleEntities = new ArrayList(); + boolean hasRecords = true; + while (hasRecords) { + Page page = elasticsearchTemplate.scroll(scrollId, 5000L, new SearchResultMapper() { + @Override + public FacetedPage mapResults(SearchResponse response, Class clazz, Pageable pageable) { + List chunk = new ArrayList(); + for (SearchHit searchHit : response.getHits()) { + if (response.getHits().getHits().length <= 0) { + return null; + } + SampleEntity user = new SampleEntity(); + user.setId(searchHit.getId()); + user.setMessage((String) searchHit.getSource().get("message")); + chunk.add(user); + } + if (chunk.size() > 0) { + return new FacetedPageImpl((List) chunk); + } + return null; + } + }); + if (page != null) { + sampleEntities.addAll(page.getContent()); + } else { + hasRecords = false; + } + } + assertThat(sampleEntities.size(), is(equalTo(30))); + } + + @Test + public void shouldReturnResultsForScanAndScrollWithCustomResultMapperForGivenSearchQuery() { //given List entities = createSampleEntitiesWithMessage("Test message", 30); // when