Fix number of deleted/missing documents in Delete-By-Query

The deleted counter is incremented even if the document is missing. Also, this commit ensures that the scroll id is cleared even if no documents are found by the scan request.
This commit is contained in:
Tanguy Leroux 2015-06-18 13:09:42 +02:00
parent 85dccdb8ab
commit 8bd3d7e4a4
2 changed files with 76 additions and 4 deletions

View File

@ -123,7 +123,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
addShardFailures(searchResponse.getShardFailures());
if (hits == 0) {
listener.onResponse(buildResponse());
finishHim(searchResponse.getScrollId(), false, null);
return;
}
total.set(hits);
@ -221,9 +221,10 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
if (item.isFailed()) {
indexCounter.incrementFailed();
} else {
indexCounter.incrementDeleted();
DeleteResponse delete = item.getResponse();
if (!delete.isFound()) {
if (delete.isFound()) {
indexCounter.incrementDeleted();
} else {
indexCounter.incrementMissing();
}
}

View File

@ -270,7 +270,7 @@ public class TransportDeleteByQueryActionTests extends ElasticsearchSingleNodeTe
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).onBulkResponse(searchResponse.getScrollId(), new BulkResponse(items, 0L));
newAsyncAction(delete, listener).onBulkResponse(scrollId, new BulkResponse(items, 0L));
waitForCompletion("waiting for bulk response to complete", listener);
assertNoFailures(listener);
@ -282,6 +282,77 @@ public class TransportDeleteByQueryActionTests extends ElasticsearchSingleNodeTe
}
}
@Test
public void testOnBulkResponseMultipleIndices() {
final int nbIndices = randomIntBetween(2, 5);
// Holds counters for the total + all indices
final long[] found = new long[1 + nbIndices];
final long[] deleted = new long[1 + nbIndices];
final long[] missing = new long[1 + nbIndices];
final long[] failed = new long[1 + nbIndices];
final int nbItems = randomIntBetween(0, 100);
found[0] = nbItems;
BulkItemResponse[] items = new BulkItemResponse[nbItems];
for (int i = 0; i < nbItems; i++) {
int index = randomIntBetween(1, nbIndices);
found[index] = found[index] + 1;
if (randomBoolean()) {
boolean delete = true;
if (rarely()) {
delete = false;
missing[0] = missing[0] + 1;
missing[index] = missing[index] + 1;
} else {
deleted[0] = deleted[0] + 1;
deleted[index] = deleted[index] + 1;
}
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test-" + index, "type", String.valueOf(i), 1, delete));
} else {
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test-" + index, "type", String.valueOf(i), new Throwable("item failed")));
failed[0] = failed[0] + 1;
failed[index] = failed[index] + 1;
}
}
// We just need a valid scroll id
createIndex("test");
SearchResponse searchResponse = client().prepareSearch().setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueSeconds(10)).get();
String scrollId = searchResponse.getScrollId();
assertTrue(Strings.hasText(scrollId));
try {
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).onBulkResponse(scrollId, new BulkResponse(items, 0L));
waitForCompletion("waiting for bulk response to complete", listener);
assertNoFailures(listener);
assertThat(listener.getResponse().getTotalDeleted(), equalTo(deleted[0]));
assertThat(listener.getResponse().getTotalFailed(), equalTo(failed[0]));
assertThat(listener.getResponse().getTotalMissing(), equalTo(missing[0]));
for (int i = 1; i <= nbIndices; i++) {
IndexDeleteByQueryResponse indexResponse = listener.getResponse().getIndex("test-" + i);
if (found[i] >= 1) {
assertNotNull(indexResponse);
assertThat(indexResponse.getFound(), equalTo(found[i]));
assertThat(indexResponse.getDeleted(), equalTo(deleted[i]));
assertThat(indexResponse.getFailed(), equalTo(failed[i]));
assertThat(indexResponse.getMissing(), equalTo(missing[i]));
} else {
assertNull(indexResponse);
}
}
} finally {
client().prepareClearScroll().addScrollId(scrollId).get();
}
}
@Test
public void testOnBulkFailureNoDocuments() {
DeleteByQueryRequest delete = new DeleteByQueryRequest();