From f45e6ae3f9643161424a4169d272bfa8588c202e Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Fri, 2 Jan 2015 12:33:24 +0100 Subject: [PATCH] [index] Prevent duplication of documents when retry indexing after fail If bulk index request fails due to a disconnect, unavailable shard etc, the request is retried once before actually failing. However, even in case of failure the documents might already be indexed. For autogenerated ids the request must not add the documents again and therfore canHaveDuplicates must be set to true. closes #8788 --- .../action/bulk/TransportShardBulkAction.java | 2 +- ...nsportShardReplicationOperationAction.java | 1 + .../index/engine/internal/InternalEngine.java | 11 ++ .../index/store/ExceptionRetryTests.java | 135 ++++++++++++++++++ 4 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/elasticsearch/index/store/ExceptionRetryTests.java diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 7aad2200042..5dcfc9f8bfb 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -73,7 +73,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation private final static String OP_TYPE_UPDATE = "update"; private final static String OP_TYPE_DELETE = "delete"; - private static final String ACTION_NAME = BulkAction.NAME + "[s]"; + public static final String ACTION_NAME = BulkAction.NAME + "[s]"; private final MappingUpdatedAction mappingUpdatedAction; private final UpdateHelper updateHelper; diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 0c1480c9985..b6569a850cc 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -443,6 +443,7 @@ public abstract class TransportShardReplicationOperationAction uniqueIds = new HashSet(); + long dupCounter = 0; + boolean found_duplicate_already = false; + for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { + if (!uniqueIds.add(searchResponse.getHits().getHits()[i].getId())) { + if (!found_duplicate_already) { + SearchResponse dupIdResponse = client().prepareSearch("index").setQuery(termQuery("_id", searchResponse.getHits().getHits()[i].getId())).setExplain(true).get(); + assertThat(dupIdResponse.getHits().totalHits(), greaterThan(1l)); + logger.info("found a duplicate id:"); + for (SearchHit hit : dupIdResponse.getHits()) { + logger.info("Doc {} was found on shard {}", hit.getId(), hit.getShard().getShardId()); + } + logger.info("will not print anymore in case more duplicates are found."); + found_duplicate_already = true; + } + dupCounter++; + } + } + assertSearchResponse(searchResponse); + assertThat(dupCounter, equalTo(0l)); + assertHitCount(searchResponse, numDocs); + } +}