Moved BackgroundIndexer to a top level class. Moved waitNumDocs to ElasticsearchIntegrationTestBase.
Update RelocationTests to use the above and unified testPrimaryRelocationWhileIndexing & testReplicaRelocationWhileIndexingRandom into a single randomized test.
This commit is contained in:
parent
a6a12f97a2
commit
83a013320c
|
@ -26,125 +26,29 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.client.Client;
|
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.collect.MapBuilder;
|
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
|
import org.elasticsearch.test.BackgroundIndexer;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
|
||||||
import static org.hamcrest.Matchers.emptyIterable;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
|
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
|
||||||
|
|
||||||
|
|
||||||
public class BackgroundIndexer implements AutoCloseable {
|
|
||||||
|
|
||||||
final Thread[] writers;
|
|
||||||
final CountDownLatch stopLatch;
|
|
||||||
final CopyOnWriteArrayList<Throwable> failures;
|
|
||||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
|
||||||
final AtomicLong idGenerator = new AtomicLong();
|
|
||||||
final AtomicLong indexCounter = new AtomicLong();
|
|
||||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
public BackgroundIndexer() {
|
|
||||||
this(scaledRandomIntBetween(3, 10));
|
|
||||||
}
|
|
||||||
|
|
||||||
public BackgroundIndexer(int writerCount) {
|
|
||||||
this(writerCount, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public BackgroundIndexer(int writerCount, boolean autoStart) {
|
|
||||||
|
|
||||||
failures = new CopyOnWriteArrayList<>();
|
|
||||||
writers = new Thread[writerCount];
|
|
||||||
stopLatch = new CountDownLatch(writers.length);
|
|
||||||
logger.info("--> starting {} indexing threads", writerCount);
|
|
||||||
for (int i = 0; i < writers.length; i++) {
|
|
||||||
final int indexerId = i;
|
|
||||||
final Client client = client();
|
|
||||||
writers[i] = new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
long id = -1;
|
|
||||||
try {
|
|
||||||
startLatch.await();
|
|
||||||
logger.info("**** starting indexing thread {}", indexerId);
|
|
||||||
while (!stop.get()) {
|
|
||||||
id = idGenerator.incrementAndGet();
|
|
||||||
client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId)
|
|
||||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
|
||||||
indexCounter.incrementAndGet();
|
|
||||||
}
|
|
||||||
logger.info("**** done indexing thread {}", indexerId);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
failures.add(e);
|
|
||||||
logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);
|
|
||||||
} finally {
|
|
||||||
stopLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
writers[i].start();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (autoStart) {
|
|
||||||
startLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() {
|
|
||||||
startLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() throws InterruptedException {
|
|
||||||
if (stop.get()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
stop.set(true);
|
|
||||||
|
|
||||||
assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
|
|
||||||
assertNoFailures();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long totalIndexedDocs() {
|
|
||||||
return indexCounter.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Throwable[] getFailures() {
|
|
||||||
return failures.toArray(new Throwable[failures.size()]);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void assertNoFailures() {
|
|
||||||
assertThat(failures, emptyIterable());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws Exception {
|
|
||||||
stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE")
|
@TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE")
|
||||||
@Slow
|
@Slow
|
||||||
|
@ -154,10 +58,10 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||||
|
|
||||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) {
|
||||||
int waitFor = totalNumDocs / 10;
|
int waitFor = totalNumDocs / 10;
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||||
waitForDocs(waitFor);
|
waitForDocs(waitFor, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", waitFor);
|
logger.info("--> {} docs indexed", waitFor);
|
||||||
|
|
||||||
|
@ -167,7 +71,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
waitFor += totalNumDocs / 10;
|
waitFor += totalNumDocs / 10;
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||||
waitForDocs(waitFor);
|
waitForDocs(waitFor, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", waitFor);
|
logger.info("--> {} docs indexed", waitFor);
|
||||||
|
|
||||||
|
@ -180,7 +84,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=2").execute().actionGet().isTimedOut(), equalTo(false));
|
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().setWaitForNodes(">=2").execute().actionGet().isTimedOut(), equalTo(false));
|
||||||
|
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||||
waitForDocs(totalNumDocs);
|
waitForDocs(totalNumDocs, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", totalNumDocs);
|
logger.info("--> {} docs indexed", totalNumDocs);
|
||||||
|
|
||||||
|
@ -204,10 +108,10 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||||
|
|
||||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) {
|
||||||
int waitFor = totalNumDocs / 10;
|
int waitFor = totalNumDocs / 10;
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||||
waitForDocs(waitFor);
|
waitForDocs(waitFor, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", waitFor);
|
logger.info("--> {} docs indexed", waitFor);
|
||||||
|
|
||||||
|
@ -217,7 +121,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
waitFor += totalNumDocs / 10;
|
waitFor += totalNumDocs / 10;
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||||
waitForDocs(waitFor);
|
waitForDocs(waitFor, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", waitFor);
|
logger.info("--> {} docs indexed", waitFor);
|
||||||
logger.info("--> allow 4 nodes for index [test] ...");
|
logger.info("--> allow 4 nodes for index [test] ...");
|
||||||
|
@ -228,7 +132,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
|
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||||
waitForDocs(totalNumDocs);
|
waitForDocs(totalNumDocs, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", totalNumDocs);
|
logger.info("--> {} docs indexed", totalNumDocs);
|
||||||
|
|
||||||
|
@ -252,10 +156,10 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||||
|
|
||||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) {
|
||||||
int waitFor = totalNumDocs / 10;
|
int waitFor = totalNumDocs / 10;
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||||
waitForDocs(waitFor);
|
waitForDocs(waitFor, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", waitFor);
|
logger.info("--> {} docs indexed", waitFor);
|
||||||
|
|
||||||
|
@ -265,7 +169,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
waitFor += totalNumDocs / 10;
|
waitFor += totalNumDocs / 10;
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||||
waitForDocs(waitFor);
|
waitForDocs(waitFor, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> {} docs indexed", waitFor);
|
logger.info("--> {} docs indexed", waitFor);
|
||||||
|
|
||||||
|
@ -278,7 +182,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
|
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||||
waitForDocs(totalNumDocs);
|
waitForDocs(totalNumDocs, indexer);
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
|
|
||||||
logger.info("--> {} docs indexed", totalNumDocs);
|
logger.info("--> {} docs indexed", totalNumDocs);
|
||||||
|
@ -323,12 +227,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
final int numDocs = scaledRandomIntBetween(200, 50000);
|
final int numDocs = scaledRandomIntBetween(200, 50000);
|
||||||
|
|
||||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) {
|
||||||
|
|
||||||
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) {
|
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) {
|
||||||
indexer.assertNoFailures();
|
indexer.assertNoFailures();
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", i);
|
logger.info("--> waiting for {} docs to be indexed ...", i);
|
||||||
waitForDocs(i);
|
waitForDocs(i, indexer);
|
||||||
logger.info("--> {} docs indexed", i);
|
logger.info("--> {} docs indexed", i);
|
||||||
allowNodes = 2 / allowNodes;
|
allowNodes = 2 / allowNodes;
|
||||||
allowNodes("test", allowNodes);
|
allowNodes("test", allowNodes);
|
||||||
|
@ -417,25 +321,4 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
}, 5, TimeUnit.MINUTES), equalTo(true));
|
}, 5, TimeUnit.MINUTES), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForDocs(final long numDocs) throws InterruptedException {
|
|
||||||
final long[] lastKnownCount = {-1};
|
|
||||||
long lastStartCount = -1;
|
|
||||||
Predicate<Object> testDocs = new Predicate<Object>() {
|
|
||||||
public boolean apply(Object o) {
|
|
||||||
lastKnownCount[0] = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount();
|
|
||||||
logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount[0], numDocs);
|
|
||||||
return lastKnownCount[0] > numDocs;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// 5 minutes seems like a long time but while relocating, indexing threads can wait for up to ~1m before retrying when
|
|
||||||
// they first try to index into a shard which is not STARTED.
|
|
||||||
while (!awaitBusy(testDocs, 5, TimeUnit.MINUTES)) {
|
|
||||||
if (lastStartCount == lastKnownCount[0]) {
|
|
||||||
// we didn't make any progress
|
|
||||||
fail("failed to reach " + numDocs + "docs");
|
|
||||||
}
|
|
||||||
lastStartCount = lastKnownCount[0];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,14 +21,9 @@ package org.elasticsearch.recovery;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.IntOpenHashSet;
|
import com.carrotsearch.hppc.IntOpenHashSet;
|
||||||
import com.carrotsearch.hppc.procedures.IntProcedure;
|
import com.carrotsearch.hppc.procedures.IntProcedure;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
|
||||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
|
||||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||||
import org.elasticsearch.client.Client;
|
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
@ -36,14 +31,12 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.SearchHits;
|
import org.elasticsearch.search.SearchHits;
|
||||||
|
import org.elasticsearch.test.BackgroundIndexer;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||||
|
@ -106,19 +99,14 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Slow
|
@Slow
|
||||||
public void testPrimaryRelocationWhileIndexingRandom() throws Exception {
|
public void testRelocationWhileIndexingRandom() throws Exception {
|
||||||
int numRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
|
int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
|
||||||
int numWriters = scaledRandomIntBetween(1, rarely() ? 10 : 4);
|
int numberOfReplicas = randomBoolean() ? 0 : 1;
|
||||||
boolean batch = getRandom().nextBoolean();
|
int numberOfNodes = numberOfReplicas == 0 ? 2 : 3;
|
||||||
logger.info("testPrimaryRelocationWhileIndexingRandom(numRelocations={}, numWriters={}, batch={}",
|
|
||||||
numRelocations, numWriters, batch);
|
|
||||||
testPrimaryRelocationWhileIndexing(numRelocations, numWriters, batch);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
logger.info("testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})", numberOfRelocations, numberOfReplicas, numberOfNodes);
|
||||||
|
|
||||||
|
String[] nodes = new String[numberOfNodes];
|
||||||
private void testPrimaryRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception {
|
|
||||||
String[] nodes = new String[2];
|
|
||||||
logger.info("--> starting [node1] ...");
|
logger.info("--> starting [node1] ...");
|
||||||
nodes[0] = cluster().startNode();
|
nodes[0] = cluster().startNode();
|
||||||
|
|
||||||
|
@ -126,87 +114,41 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
client().admin().indices().prepareCreate("test")
|
client().admin().indices().prepareCreate("test")
|
||||||
.setSettings(settingsBuilder()
|
.setSettings(settingsBuilder()
|
||||||
.put("index.number_of_shards", 1)
|
.put("index.number_of_shards", 1)
|
||||||
.put("index.number_of_replicas", 0)
|
.put("index.number_of_replicas", numberOfReplicas)
|
||||||
).execute().actionGet();
|
).execute().actionGet();
|
||||||
|
|
||||||
logger.info("--> starting [node2] ...");
|
|
||||||
nodes[1] = cluster().startNode();
|
|
||||||
|
|
||||||
final AtomicLong idGenerator = new AtomicLong();
|
for (int i = 1; i < numberOfNodes; i++) {
|
||||||
final AtomicLong indexCounter = new AtomicLong();
|
logger.info("--> starting [node{}] ...", i + 1);
|
||||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
nodes[i] = cluster().startNode();
|
||||||
Thread[] writers = new Thread[numberOfWriters];
|
if (i != numberOfNodes - 1) {
|
||||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
.setWaitForNodes(Integer.toString(i + 1)).setWaitForGreenStatus().execute().actionGet();
|
||||||
logger.info("--> starting {} indexing threads", writers.length);
|
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
||||||
for (int i = 0; i < writers.length; i++) {
|
|
||||||
final Client perThreadClient = client();
|
|
||||||
final int indexerId = i;
|
|
||||||
writers[i] = new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
startLatch.await();
|
|
||||||
logger.info("**** starting indexing thread {}", indexerId);
|
|
||||||
while (!stop.get()) {
|
|
||||||
if (batch) {
|
|
||||||
BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk();
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
long id = idGenerator.incrementAndGet();
|
|
||||||
if (id % 1000 == 0) {
|
|
||||||
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
|
|
||||||
}
|
|
||||||
bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id))
|
|
||||||
.setSource("test", "value" + id));
|
|
||||||
}
|
|
||||||
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
|
|
||||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
|
||||||
if (!bulkItemResponse.isFailed()) {
|
|
||||||
indexCounter.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
logger.warn("**** failed bulk indexing thread {}, {}/{}", indexerId, bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailure().getMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
long id = idGenerator.incrementAndGet();
|
try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client())) {
|
||||||
if (id % 1000 == 0) {
|
|
||||||
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
|
|
||||||
}
|
|
||||||
perThreadClient.prepareIndex("test", "type1", Long.toString(id))
|
|
||||||
.setSource("test", "value" + id).execute().actionGet();
|
|
||||||
indexCounter.incrementAndGet();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info("**** done indexing thread {}", indexerId);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("**** failed indexing thread {}", e, indexerId);
|
|
||||||
} finally {
|
|
||||||
stopLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
writers[i].start();
|
|
||||||
}
|
|
||||||
startLatch.countDown();
|
|
||||||
final int numDocs = scaledRandomIntBetween(200, 2500);
|
final int numDocs = scaledRandomIntBetween(200, 2500);
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
|
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
|
||||||
awaitBusy(new Predicate<Object>() {
|
waitForDocs(numDocs, indexer);
|
||||||
@Override
|
|
||||||
public boolean apply(Object input) {
|
|
||||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
|
||||||
return client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() >= numDocs;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
logger.info("--> {} docs indexed", numDocs);
|
logger.info("--> {} docs indexed", numDocs);
|
||||||
|
|
||||||
logger.info("--> starting relocations...");
|
logger.info("--> starting relocations...");
|
||||||
|
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those
|
||||||
for (int i = 0; i < numberOfRelocations; i++) {
|
for (int i = 0; i < numberOfRelocations; i++) {
|
||||||
int fromNode = (i % 2);
|
int fromNode = (i % 2);
|
||||||
int toNode = fromNode == 0 ? 1 : 0;
|
int toNode = fromNode == 0 ? 1 : 0;
|
||||||
|
fromNode += nodeShiftBased;
|
||||||
|
toNode += nodeShiftBased;
|
||||||
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
|
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
|
||||||
client().admin().cluster().prepareReroute()
|
client().admin().cluster().prepareReroute()
|
||||||
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
|
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
|
||||||
.execute().actionGet();
|
.get();
|
||||||
|
if (rarely()) {
|
||||||
|
logger.debug("--> flushing");
|
||||||
|
client().admin().indices().prepareFlush().get();
|
||||||
|
}
|
||||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
||||||
|
@ -214,10 +156,8 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
||||||
}
|
}
|
||||||
logger.info("--> done relocations");
|
logger.info("--> done relocations");
|
||||||
|
logger.info("--> waiting for indexing threads to stop ...");
|
||||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
indexer.stop();
|
||||||
stop.set(true);
|
|
||||||
stopLatch.await();
|
|
||||||
logger.info("--> indexing threads stopped");
|
logger.info("--> indexing threads stopped");
|
||||||
|
|
||||||
logger.info("--> refreshing the index");
|
logger.info("--> refreshing the index");
|
||||||
|
@ -227,11 +167,11 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
try {
|
try {
|
||||||
logger.info("--> START search test round {}", i + 1);
|
logger.info("--> START search test round {}", i + 1);
|
||||||
SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits();
|
SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexer.totalIndexedDocs()).setNoFields().execute().actionGet().getHits();
|
||||||
ranOnce = true;
|
ranOnce = true;
|
||||||
if (hits.totalHits() != indexCounter.get()) {
|
if (hits.totalHits() != indexer.totalIndexedDocs()) {
|
||||||
int[] hitIds = new int[(int) indexCounter.get()];
|
int[] hitIds = new int[(int) indexer.totalIndexedDocs()];
|
||||||
for (int hit = 0; hit < indexCounter.get(); hit++) {
|
for (int hit = 0; hit < indexer.totalIndexedDocs(); hit++) {
|
||||||
hitIds[hit] = hit + 1;
|
hitIds[hit] = hit + 1;
|
||||||
}
|
}
|
||||||
IntOpenHashSet set = IntOpenHashSet.from(hitIds);
|
IntOpenHashSet set = IntOpenHashSet.from(hitIds);
|
||||||
|
@ -250,172 +190,7 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
assertThat(hits.totalHits(), equalTo(indexCounter.get()));
|
assertThat(hits.totalHits(), equalTo(indexer.totalIndexedDocs()));
|
||||||
logger.info("--> DONE search test round {}", i + 1);
|
|
||||||
} catch (SearchPhaseExecutionException ex) {
|
|
||||||
// TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough?
|
|
||||||
logger.warn("Got exception while searching.", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!ranOnce) {
|
|
||||||
fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@Slow
|
|
||||||
public void testReplicaRelocationWhileIndexingRandom() throws Exception {
|
|
||||||
int numRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
|
|
||||||
int numWriters = scaledRandomIntBetween(1, rarely() ? 10 : 4);
|
|
||||||
boolean batch = getRandom().nextBoolean();
|
|
||||||
logger.info("testReplicaRelocationWhileIndexing(numRelocations={}, numWriters={}, batch={}", numRelocations, numWriters, batch);
|
|
||||||
testReplicaRelocationWhileIndexing(numRelocations, numWriters, batch);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testReplicaRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception {
|
|
||||||
logger.info("--> starting [node1] ...");
|
|
||||||
String[] nodes = new String[3];
|
|
||||||
nodes[0] = cluster().startNode();
|
|
||||||
|
|
||||||
logger.info("--> creating test index ...");
|
|
||||||
client().admin().indices().prepareCreate("test")
|
|
||||||
.setSettings(settingsBuilder()
|
|
||||||
.put("index.number_of_shards", 1)
|
|
||||||
.put("index.number_of_replicas", 1)
|
|
||||||
).execute().actionGet();
|
|
||||||
|
|
||||||
logger.info("--> starting [node2] ...");
|
|
||||||
nodes[1] = cluster().startNode();
|
|
||||||
|
|
||||||
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet();
|
|
||||||
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
|
||||||
|
|
||||||
logger.info("--> starting [node3] ...");
|
|
||||||
nodes[2] = cluster().startNode();
|
|
||||||
|
|
||||||
healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet();
|
|
||||||
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
|
||||||
|
|
||||||
final AtomicLong idGenerator = new AtomicLong();
|
|
||||||
final AtomicLong indexCounter = new AtomicLong();
|
|
||||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
|
||||||
Thread[] writers = new Thread[numberOfWriters];
|
|
||||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
|
||||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
|
||||||
logger.info("--> starting {} indexing threads", writers.length);
|
|
||||||
for (int i = 0; i < writers.length; i++) {
|
|
||||||
final Client perThreadClient = client();
|
|
||||||
final int indexerId = i;
|
|
||||||
writers[i] = new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
|
|
||||||
try {
|
|
||||||
startLatch.await();
|
|
||||||
logger.info("**** starting indexing thread {}", indexerId);
|
|
||||||
while (!stop.get()) {
|
|
||||||
if (batch) {
|
|
||||||
BulkRequestBuilder bulkRequest = perThreadClient.prepareBulk();
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
long id = idGenerator.incrementAndGet();
|
|
||||||
if (id % 1000 == 0) {
|
|
||||||
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
|
|
||||||
}
|
|
||||||
bulkRequest.add(perThreadClient.prepareIndex("test", "type1", Long.toString(id))
|
|
||||||
.setSource("test", "value" + id));
|
|
||||||
}
|
|
||||||
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
|
|
||||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
|
||||||
if (!bulkItemResponse.isFailed()) {
|
|
||||||
indexCounter.incrementAndGet();
|
|
||||||
} else {
|
|
||||||
logger.warn("**** failed bulk indexing thread {}, {}/{}", indexerId, bulkItemResponse.getFailure().getId(), bulkItemResponse.getFailure().getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
long id = idGenerator.incrementAndGet();
|
|
||||||
if (id % 1000 == 0) {
|
|
||||||
perThreadClient.admin().indices().prepareFlush().execute().actionGet();
|
|
||||||
}
|
|
||||||
perThreadClient.prepareIndex("test", "type1", Long.toString(id))
|
|
||||||
.setSource("test", "value" + id).execute().actionGet();
|
|
||||||
indexCounter.incrementAndGet();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info("**** done indexing thread {}", indexerId);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("**** failed indexing thread {}", e, indexerId);
|
|
||||||
} finally {
|
|
||||||
stopLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
writers[i].start();
|
|
||||||
}
|
|
||||||
|
|
||||||
startLatch.countDown();
|
|
||||||
final int numDocs = scaledRandomIntBetween(200, 2500);
|
|
||||||
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
|
|
||||||
awaitBusy(new Predicate<Object>() {
|
|
||||||
@Override
|
|
||||||
public boolean apply(Object input) {
|
|
||||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
|
||||||
return client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount() >= numDocs;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
logger.info("--> {} docs indexed", numDocs);
|
|
||||||
|
|
||||||
logger.info("--> starting relocations...");
|
|
||||||
for (int i = 0; i < numberOfRelocations; i++) {
|
|
||||||
int fromNode = (1 + (i % 2));
|
|
||||||
int toNode = fromNode == 1 ? 2 : 1;
|
|
||||||
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
|
|
||||||
client().admin().cluster().prepareReroute()
|
|
||||||
.add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode]))
|
|
||||||
.execute().actionGet();
|
|
||||||
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
|
||||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
|
||||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
|
|
||||||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
|
||||||
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
|
|
||||||
}
|
|
||||||
logger.info("--> done relocations");
|
|
||||||
|
|
||||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
|
||||||
stop.set(true);
|
|
||||||
stopLatch.await();
|
|
||||||
logger.info("--> indexing threads stopped");
|
|
||||||
|
|
||||||
logger.info("--> refreshing the index");
|
|
||||||
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
|
||||||
logger.info("--> searching the index");
|
|
||||||
boolean ranOnce = false;
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
try {
|
|
||||||
logger.info("--> START search test round {}", i + 1);
|
|
||||||
SearchHits hits = client().prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().getHits();
|
|
||||||
ranOnce = true;
|
|
||||||
if (hits.totalHits() != indexCounter.get()) {
|
|
||||||
int[] hitIds = new int[(int) indexCounter.get()];
|
|
||||||
for (int hit = 0; hit < indexCounter.get(); hit++) {
|
|
||||||
hitIds[hit] = hit + 1;
|
|
||||||
}
|
|
||||||
IntOpenHashSet set = IntOpenHashSet.from(hitIds);
|
|
||||||
for (SearchHit hit : hits.hits()) {
|
|
||||||
int id = Integer.parseInt(hit.id());
|
|
||||||
if (!set.remove(id)) {
|
|
||||||
logger.error("Extra id [{}]", id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
set.forEach(new IntProcedure() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void apply(int value) {
|
|
||||||
logger.error("Missing id [{}]", value);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
assertThat(hits.totalHits(), equalTo(indexCounter.get()));
|
|
||||||
logger.info("--> DONE search test round {}", i + 1);
|
logger.info("--> DONE search test round {}", i + 1);
|
||||||
} catch (SearchPhaseExecutionException ex) {
|
} catch (SearchPhaseExecutionException ex) {
|
||||||
// TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough?
|
// TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough?
|
||||||
|
@ -427,3 +202,5 @@ public class RelocationTests extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
package org.elasticsearch.test;/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.RandomizedTest;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
|
import org.elasticsearch.recovery.RecoveryWhileUnderLoadTests;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.emptyIterable;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
public class BackgroundIndexer implements AutoCloseable {
|
||||||
|
|
||||||
|
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
|
||||||
|
|
||||||
|
final Thread[] writers;
|
||||||
|
final CountDownLatch stopLatch;
|
||||||
|
final CopyOnWriteArrayList<Throwable> failures;
|
||||||
|
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
|
final AtomicLong idGenerator = new AtomicLong();
|
||||||
|
final AtomicLong indexCounter = new AtomicLong();
|
||||||
|
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public BackgroundIndexer(String index, String type, Client client) {
|
||||||
|
this(index, type, client, RandomizedTest.scaledRandomIntBetween(3, 10));
|
||||||
|
}
|
||||||
|
|
||||||
|
public BackgroundIndexer(String index, String type, Client client, int writerCount) {
|
||||||
|
this(index, type, client, writerCount, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart) {
|
||||||
|
|
||||||
|
failures = new CopyOnWriteArrayList<>();
|
||||||
|
writers = new Thread[writerCount];
|
||||||
|
stopLatch = new CountDownLatch(writers.length);
|
||||||
|
logger.info("--> starting {} indexing threads", writerCount);
|
||||||
|
for (int i = 0; i < writers.length; i++) {
|
||||||
|
final int indexerId = i;
|
||||||
|
final boolean batch = RandomizedTest.getRandom().nextBoolean();
|
||||||
|
writers[i] = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
long id = -1;
|
||||||
|
try {
|
||||||
|
startLatch.await();
|
||||||
|
logger.info("**** starting indexing thread {}", indexerId);
|
||||||
|
while (!stop.get()) {
|
||||||
|
if (batch) {
|
||||||
|
int batchSize = RandomizedTest.getRandom().nextInt(20) + 1;
|
||||||
|
BulkRequestBuilder bulkRequest = client.prepareBulk();
|
||||||
|
for (int i = 0; i < batchSize; i++) {
|
||||||
|
id = idGenerator.incrementAndGet();
|
||||||
|
bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource("test", "value" + id));
|
||||||
|
}
|
||||||
|
BulkResponse bulkResponse = bulkRequest.get();
|
||||||
|
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||||
|
if (!bulkItemResponse.isFailed()) {
|
||||||
|
indexCounter.incrementAndGet();
|
||||||
|
} else {
|
||||||
|
throw new ElasticsearchException("bulk request failure, id: ["
|
||||||
|
+ bulkItemResponse.getFailure().getId() + "] message: " + bulkItemResponse.getFailure().getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
id = idGenerator.incrementAndGet();
|
||||||
|
client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get();
|
||||||
|
indexCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.info("**** done indexing thread {}", indexerId);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
failures.add(e);
|
||||||
|
logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);
|
||||||
|
} finally {
|
||||||
|
stopLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
writers[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (autoStart) {
|
||||||
|
startLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
startLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws InterruptedException {
|
||||||
|
if (stop.get()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
stop.set(true);
|
||||||
|
|
||||||
|
Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
|
||||||
|
assertNoFailures();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long totalIndexedDocs() {
|
||||||
|
return indexCounter.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Throwable[] getFailures() {
|
||||||
|
return failures.toArray(new Throwable[failures.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assertNoFailures() {
|
||||||
|
Assert.assertThat(failures, emptyIterable());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue