Improve error reporting for tests with BackgroundIndexer (#20324)

The BackgroundIndexer now uses auto-generated IDs randomly. This causes some problems
for tests that still rely on the fact that the IDs are increasing integers. This change
exposes all IDs via a Set<String> to iterate over for tests.
This commit is contained in:
Simon Willnauer 2016-09-05 16:28:49 +02:00 committed by GitHub
parent 6f6d17dc9c
commit 5c2d9fa158
3 changed files with 34 additions and 23 deletions

View File

@ -42,6 +42,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -105,7 +106,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
logger.info("--> refreshing the index");
refreshAndAssert();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10);
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
}
@ -156,7 +157,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
logger.info("--> refreshing the index");
refreshAndAssert();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10);
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
}
@ -225,7 +226,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
logger.info("--> refreshing the index");
refreshAndAssert();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10);
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
}
@ -263,11 +264,12 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
logger.info("--> refreshing the index");
refreshAndAssert();
logger.info("--> verifying indexed content");
iterateAssertCount(numShards, indexer.totalIndexedDocs(), 10);
iterateAssertCount(numShards, 10, indexer.getIds());
}
}
private void iterateAssertCount(final int numberOfShards, final long numberOfDocs, final int iterations) throws Exception {
private void iterateAssertCount(final int numberOfShards, final int iterations, final Set<String> ids) throws Exception {
final long numberOfDocs = ids.size();
SearchResponse[] iterationResults = new SearchResponse[iterations];
boolean error = false;
for (int i = 0; i < iterations; i++) {
@ -290,12 +292,11 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
ClusterService clusterService = clusterService();
final ClusterState state = clusterService.state();
for (int shard = 0; shard < numberOfShards; shard++) {
// background indexer starts using ids on 1
for (int id = 1; id <= numberOfDocs; id++) {
ShardId docShard = clusterService.operationRouting().shardId(state, "test", Long.toString(id), null);
for (String id : ids) {
ShardId docShard = clusterService.operationRouting().shardId(state, "test", id, null);
if (docShard.id() == shard) {
for (ShardRouting shardRouting : state.routingTable().shardRoutingTable("test", shard)) {
GetResponse response = client().prepareGet("test", "type", Long.toString(id))
GetResponse response = client().prepareGet("test", "type", id)
.setPreference("_only_nodes:" + shardRouting.currentNodeId()).get();
if (response.isExists()) {
logger.info("missing id [{}] on shard {}", id, shardRouting);
@ -321,6 +322,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
TimeUnit.MINUTES
)
);
assertEquals(numberOfDocs, ids.size());
}
//lets now make the test fail if it was supposed to fail

View File

@ -232,13 +232,8 @@ public class RelocationIT extends ESIntegTestCase {
logger.error("Extra id [{}]", id);
}
}
set.forEach(new IntProcedure() {
@Override
public void apply(int value) {
logger.error("Missing id [{}]", value);
}
set.forEach((IntProcedure) value -> {
logger.error("Missing id [{}]", value);
});
}
assertThat(hits.totalHits(), equalTo(indexer.totalIndexedDocs()));

View File

@ -27,14 +27,17 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Assert;
import java.io.IOException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@ -54,11 +57,11 @@ public class BackgroundIndexer implements AutoCloseable {
final CopyOnWriteArrayList<Exception> failures;
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore
final Semaphore availableBudget = new Semaphore(0);
final boolean useAutoGeneratedIDs;
private final Set<String> ids = ConcurrentCollections.newConcurrentSet();
volatile int minFieldSize = 10;
volatile int maxFieldSize = 140;
@ -158,7 +161,8 @@ public class BackgroundIndexer implements AutoCloseable {
BulkResponse bulkResponse = bulkRequest.get();
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (!bulkItemResponse.isFailed()) {
indexCounter.incrementAndGet();
boolean add = ids.add(bulkItemResponse.getId());
assert add : "ID: " + bulkItemResponse.getId() + " already used";
} else {
throw new ElasticsearchException("bulk request failure, id: ["
+ bulkItemResponse.getFailure().getId() + "] message: " + bulkItemResponse.getFailure().getMessage());
@ -173,14 +177,17 @@ public class BackgroundIndexer implements AutoCloseable {
}
id = idGenerator.incrementAndGet();
if (useAutoGeneratedIDs) {
client.prepareIndex(index, type).setSource(generateSource(id, threadRandom)).get();
IndexResponse indexResponse = client.prepareIndex(index, type).setSource(generateSource(id, threadRandom)).get();
boolean add = ids.add(indexResponse.getId());
assert add : "ID: " + indexResponse.getId() + " already used";
} else {
client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)).get();
IndexResponse indexResponse = client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)).get();
boolean add = ids.add(indexResponse.getId());
assert add : "ID: " + indexResponse.getId() + " already used";
}
indexCounter.incrementAndGet();
}
}
logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), indexCounter.get());
logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), ids.size());
} catch (Exception e) {
failures.add(e);
final long docId = id;
@ -274,7 +281,7 @@ public class BackgroundIndexer implements AutoCloseable {
}
public long totalIndexedDocs() {
return indexCounter.get();
return ids.size();
}
public Throwable[] getFailures() {
@ -299,4 +306,11 @@ public class BackgroundIndexer implements AutoCloseable {
public void close() throws Exception {
stop();
}
/**
* Returns the ID set of all documents indexed by this indexer run
*/
public Set<String> getIds() {
return this.ids;
}
}