DATAES-525 - Fixed issue in scrolling not applying the search query.

The root cause of the deletion problem was the doScroll which did not apply the given query and therefor returned all entries from the index.
The doScroll implementation has been fixed to apply the query and multiple unit tests have been added to ensure that the delete methods delete the desired documents only and leave the rest untouched.
Also added unit tests for the scrolling to test it against real queries (not only matchAll).

Original pull request: #240
This commit is contained in:
Peter Nowak 2019-01-25 16:54:11 +01:00 committed by xhaggi
parent c77b543fb8
commit e0831fa234
2 changed files with 174 additions and 0 deletions

View File

@ -979,6 +979,12 @@ public class ElasticsearchRestTemplate
Assert.notNull(searchQuery.getTypes(), "No type define for Query");
Assert.notNull(searchQuery.getPageable(), "Query.pageable is required for scan & scroll");
if (searchQuery.getQuery() != null) {
request.source().query(searchQuery.getQuery());
} else {
request.source().query(QueryBuilders.matchAllQuery());
}
if (searchQuery.getFilter() != null) {
request.source().postFilter(searchQuery.getFilter());
}

View File

@ -23,6 +23,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.assertj.core.util.Lists;
import org.elasticsearch.action.get.MultiGetItemResponse;
@ -2419,6 +2421,172 @@ public class ElasticsearchTemplateTests {
assertThat(((Map) ((Map) mapping.get("properties")).get("message")).get("type"), Matchers.<Object>is("text"));
}
@Test // DATAES-525
public void shouldDeleteOnlyDocumentsMatchedByDeleteQuery() {
List<IndexQuery> indexQueries = new ArrayList<>();
// given
// document to be deleted
String documentIdToDelete = UUID.randomUUID().toString();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(documentIdToDelete).message("some message")
.version(System.currentTimeMillis()).build()));
// remaining document
String remainingDocumentId = UUID.randomUUID().toString();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(remainingDocumentId).message("some other message")
.version(System.currentTimeMillis()).build()));
elasticsearchTemplate.bulkIndex(indexQueries);
elasticsearchTemplate.refresh(SampleEntity.class);
// when
DeleteQuery deleteQuery = new DeleteQuery();
deleteQuery.setQuery(idsQuery().addIds(documentIdToDelete));
elasticsearchTemplate.delete(deleteQuery, SampleEntity.class);
elasticsearchTemplate.refresh(SampleEntity.class);
// then
// document with id "remainingDocumentId" should still be indexed
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
Page<SampleEntity> sampleEntities = elasticsearchTemplate.queryForPage(searchQuery, SampleEntity.class);
assertThat(sampleEntities.getTotalElements(), equalTo(1L));
assertThat(sampleEntities.getContent().get(0).getId(), is(remainingDocumentId));
}
@Test // DATAES-525
public void shouldDeleteOnlyDocumentsMatchedByCriteriaQuery() {
List<IndexQuery> indexQueries = new ArrayList<>();
// given
// document to be deleted
String documentIdToDelete = UUID.randomUUID().toString();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(documentIdToDelete).message("some message")
.version(System.currentTimeMillis()).build()));
// remaining document
String remainingDocumentId = UUID.randomUUID().toString();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(remainingDocumentId).message("some other message")
.version(System.currentTimeMillis()).build()));
elasticsearchTemplate.bulkIndex(indexQueries);
elasticsearchTemplate.refresh(SampleEntity.class);
// when
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria("id").is(documentIdToDelete));
elasticsearchTemplate.delete(criteriaQuery, SampleEntity.class);
elasticsearchTemplate.refresh(SampleEntity.class);
// then
// document with id "remainingDocumentId" should still be indexed
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
Page<SampleEntity> sampleEntities = elasticsearchTemplate.queryForPage(searchQuery, SampleEntity.class);
assertThat(sampleEntities.getTotalElements(), equalTo(1L));
assertThat(sampleEntities.getContent().get(0).getId(), is(remainingDocumentId));
}
@Test // DATAES-525
public void shouldDeleteDocumentForGivenIdOnly() {
List<IndexQuery> indexQueries = new ArrayList<>();
// given
// document to be deleted
String documentIdToDelete = UUID.randomUUID().toString();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(documentIdToDelete).message("some message")
.version(System.currentTimeMillis()).build()));
// remaining document
String remainingDocumentId = UUID.randomUUID().toString();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(remainingDocumentId).message("some other message")
.version(System.currentTimeMillis()).build()));
elasticsearchTemplate.bulkIndex(indexQueries);
elasticsearchTemplate.refresh(SampleEntity.class);
// when
elasticsearchTemplate.delete(SampleEntity.class, documentIdToDelete);
elasticsearchTemplate.refresh(SampleEntity.class);
// then
// document with id "remainingDocumentId" should still be indexed
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
Page<SampleEntity> sampleEntities = elasticsearchTemplate.queryForPage(searchQuery, SampleEntity.class);
assertThat(sampleEntities.getTotalElements(), equalTo(1L));
assertThat(sampleEntities.getContent().get(0).getId(), is(remainingDocumentId));
}
@Test // DATAES-525
public void shouldApplyCriteriaQueryToScanAndScrollForGivenCriteriaQuery() {
// given
List<IndexQuery> indexQueries = new ArrayList<>();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(UUID.randomUUID().toString())
.message("some message that should be found by the scroll query").version(System.currentTimeMillis())
.build()));
indexQueries.add(getIndexQuery(SampleEntity.builder().id(UUID.randomUUID().toString())
.message("some other message that should be found by the scroll query")
.version(System.currentTimeMillis()).build()));
String notFindableMessage = "this entity must not be found by the scroll query";
indexQueries.add(getIndexQuery(SampleEntity.builder().id(UUID.randomUUID().toString())
.message(notFindableMessage).version(System.currentTimeMillis()).build()));
elasticsearchTemplate.bulkIndex(indexQueries);
elasticsearchTemplate.refresh(SampleEntity.class);
// when
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria("message").contains("message"));
criteriaQuery.addIndices(INDEX_NAME);
criteriaQuery.addTypes(TYPE_NAME);
criteriaQuery.setPageable(PageRequest.of(0, 10));
ScrolledPage<SampleEntity> scroll = (ScrolledPage<SampleEntity>) elasticsearchTemplate.startScroll(1000,
criteriaQuery, SampleEntity.class);
List<SampleEntity> sampleEntities = new ArrayList<>();
while (scroll.hasContent()) {
sampleEntities.addAll(scroll.getContent());
scroll = (ScrolledPage<SampleEntity>) elasticsearchTemplate.continueScroll(scroll.getScrollId(), 1000,
SampleEntity.class);
}
elasticsearchTemplate.clearScroll(scroll.getScrollId());
// then
assertThat(sampleEntities.size(), is(equalTo(2)));
assertThat(sampleEntities.stream().map(SampleEntity::getMessage).collect(Collectors.toList()),
not(contains(notFindableMessage)));
}
@Test // DATAES-525
public void shouldApplySearchQueryToScanAndScrollForGivenSearchQuery() {
// given
List<IndexQuery> indexQueries = new ArrayList<>();
indexQueries.add(getIndexQuery(SampleEntity.builder().id(UUID.randomUUID().toString())
.message("some message that should be found by the scroll query").version(System.currentTimeMillis())
.build()));
indexQueries.add(getIndexQuery(SampleEntity.builder().id(UUID.randomUUID().toString())
.message("some other message that should be found by the scroll query")
.version(System.currentTimeMillis()).build()));
String notFindableMessage = "this entity must not be found by the scroll query";
indexQueries.add(getIndexQuery(SampleEntity.builder().id(UUID.randomUUID().toString())
.message(notFindableMessage).version(System.currentTimeMillis()).build()));
elasticsearchTemplate.bulkIndex(indexQueries);
elasticsearchTemplate.refresh(SampleEntity.class);
// when
SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchQuery("message", "message"))
.withIndices(INDEX_NAME).withTypes(TYPE_NAME).withPageable(PageRequest.of(0, 10)).build();
ScrolledPage<SampleEntity> scroll = (ScrolledPage<SampleEntity>) elasticsearchTemplate.startScroll(1000,
searchQuery, SampleEntity.class);
List<SampleEntity> sampleEntities = new ArrayList<>();
while (scroll.hasContent()) {
sampleEntities.addAll(scroll.getContent());
scroll = (ScrolledPage<SampleEntity>) elasticsearchTemplate.continueScroll(scroll.getScrollId(), 1000,
SampleEntity.class);
}
elasticsearchTemplate.clearScroll(scroll.getScrollId());
// then
assertThat(sampleEntities.size(), is(equalTo(2)));
assertThat(sampleEntities.stream().map(SampleEntity::getMessage).collect(Collectors.toList()),
not(contains(notFindableMessage)));
}
private IndexQuery getIndexQuery(SampleEntity sampleEntity) {
return new IndexQueryBuilder()
.withId(sampleEntity.getId())