diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index d85b32145ea..71c96b85fd5 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -42,6 +42,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.Arrays; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -105,7 +106,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10); + iterateAssertCount(numberOfShards, 10, indexer.getIds()); } } @@ -156,7 +157,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10); + iterateAssertCount(numberOfShards, 10, indexer.getIds()); } } @@ -225,7 +226,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10); + iterateAssertCount(numberOfShards, 10, indexer.getIds()); } } @@ -263,11 +264,12 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { logger.info("--> refreshing the index"); refreshAndAssert(); logger.info("--> verifying indexed content"); - iterateAssertCount(numShards, indexer.totalIndexedDocs(), 10); + iterateAssertCount(numShards, 10, indexer.getIds()); } } - private void iterateAssertCount(final int numberOfShards, final long numberOfDocs, final int iterations) throws Exception { + private void iterateAssertCount(final int numberOfShards, final int iterations, final Set ids) throws Exception { + final long numberOfDocs = ids.size(); SearchResponse[] iterationResults = new SearchResponse[iterations]; boolean error = false; for (int i = 0; i < iterations; i++) { @@ -290,12 +292,11 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { ClusterService clusterService = clusterService(); final ClusterState state = clusterService.state(); for (int shard = 0; shard < numberOfShards; shard++) { - // background indexer starts using ids on 1 - for (int id = 1; id <= numberOfDocs; id++) { - ShardId docShard = clusterService.operationRouting().shardId(state, "test", Long.toString(id), null); + for (String id : ids) { + ShardId docShard = clusterService.operationRouting().shardId(state, "test", id, null); if (docShard.id() == shard) { for (ShardRouting shardRouting : state.routingTable().shardRoutingTable("test", shard)) { - GetResponse response = client().prepareGet("test", "type", Long.toString(id)) + GetResponse response = client().prepareGet("test", "type", id) .setPreference("_only_nodes:" + shardRouting.currentNodeId()).get(); if (response.isExists()) { logger.info("missing id [{}] on shard {}", id, shardRouting); @@ -321,6 +322,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { TimeUnit.MINUTES ) ); + assertEquals(numberOfDocs, ids.size()); } //lets now make the test fail if it was supposed to fail diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 02c37e2ad55..8493a08d704 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -232,13 +232,8 @@ public class RelocationIT extends ESIntegTestCase { logger.error("Extra id [{}]", id); } } - set.forEach(new IntProcedure() { - - @Override - public void apply(int value) { - logger.error("Missing id [{}]", value); - } - + set.forEach((IntProcedure) value -> { + logger.error("Missing id [{}]", value); }); } assertThat(hits.totalHits(), equalTo(indexer.totalIndexedDocs())); diff --git a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java index 5b1c1adfb80..3c5f105e4d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java @@ -27,14 +27,17 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.junit.Assert; import java.io.IOException; import java.util.Random; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -54,11 +57,11 @@ public class BackgroundIndexer implements AutoCloseable { final CopyOnWriteArrayList failures; final AtomicBoolean stop = new AtomicBoolean(false); final AtomicLong idGenerator = new AtomicLong(); - final AtomicLong indexCounter = new AtomicLong(); final CountDownLatch startLatch = new CountDownLatch(1); final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore final Semaphore availableBudget = new Semaphore(0); final boolean useAutoGeneratedIDs; + private final Set ids = ConcurrentCollections.newConcurrentSet(); volatile int minFieldSize = 10; volatile int maxFieldSize = 140; @@ -158,7 +161,8 @@ public class BackgroundIndexer implements AutoCloseable { BulkResponse bulkResponse = bulkRequest.get(); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (!bulkItemResponse.isFailed()) { - indexCounter.incrementAndGet(); + boolean add = ids.add(bulkItemResponse.getId()); + assert add : "ID: " + bulkItemResponse.getId() + " already used"; } else { throw new ElasticsearchException("bulk request failure, id: [" + bulkItemResponse.getFailure().getId() + "] message: " + bulkItemResponse.getFailure().getMessage()); @@ -173,14 +177,17 @@ public class BackgroundIndexer implements AutoCloseable { } id = idGenerator.incrementAndGet(); if (useAutoGeneratedIDs) { - client.prepareIndex(index, type).setSource(generateSource(id, threadRandom)).get(); + IndexResponse indexResponse = client.prepareIndex(index, type).setSource(generateSource(id, threadRandom)).get(); + boolean add = ids.add(indexResponse.getId()); + assert add : "ID: " + indexResponse.getId() + " already used"; } else { - client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)).get(); + IndexResponse indexResponse = client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)).get(); + boolean add = ids.add(indexResponse.getId()); + assert add : "ID: " + indexResponse.getId() + " already used"; } - indexCounter.incrementAndGet(); } } - logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), indexCounter.get()); + logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), ids.size()); } catch (Exception e) { failures.add(e); final long docId = id; @@ -274,7 +281,7 @@ public class BackgroundIndexer implements AutoCloseable { } public long totalIndexedDocs() { - return indexCounter.get(); + return ids.size(); } public Throwable[] getFailures() { @@ -299,4 +306,11 @@ public class BackgroundIndexer implements AutoCloseable { public void close() throws Exception { stop(); } + + /** + * Returns the ID set of all documents indexed by this indexer run + */ + public Set getIds() { + return this.ids; + } }