Add a BackgroundIndexer class to RecoveryWhileUnderLoadTests and use it.
Also change the document distribution a bit between the tests
This commit is contained in:
parent
e621458a39
commit
5bf8b79587
|
@ -56,6 +56,95 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
private final ESLogger logger = Loggers.getLogger(RecoveryWhileUnderLoadTests.class);
|
||||
|
||||
|
||||
public class BackgroundIndexer implements AutoCloseable {
|
||||
|
||||
final Thread[] writers;
|
||||
final CountDownLatch stopLatch;
|
||||
final CopyOnWriteArrayList<Throwable> failures;
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicLong indexCounter = new AtomicLong();
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
|
||||
public BackgroundIndexer() {
|
||||
this(scaledRandomIntBetween(3, 10));
|
||||
}
|
||||
|
||||
public BackgroundIndexer(int writerCount) {
|
||||
this(writerCount, true);
|
||||
}
|
||||
|
||||
public BackgroundIndexer(int writerCount, boolean autoStart) {
|
||||
|
||||
failures = new CopyOnWriteArrayList<>();
|
||||
writers = new Thread[writerCount];
|
||||
stopLatch = new CountDownLatch(writers.length);
|
||||
logger.info("--> starting {} indexing threads", writerCount);
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
final int indexerId = i;
|
||||
final Client client = client();
|
||||
writers[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
long id = -1;
|
||||
try {
|
||||
startLatch.await();
|
||||
logger.info("**** starting indexing thread {}", indexerId);
|
||||
while (!stop.get()) {
|
||||
id = idGenerator.incrementAndGet();
|
||||
client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId)
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
indexCounter.incrementAndGet();
|
||||
}
|
||||
logger.info("**** done indexing thread {}", indexerId);
|
||||
} catch (Throwable e) {
|
||||
failures.add(e);
|
||||
logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);
|
||||
} finally {
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
writers[i].start();
|
||||
}
|
||||
|
||||
if (autoStart) {
|
||||
startLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
startLatch.countDown();
|
||||
}
|
||||
|
||||
public void stop() throws InterruptedException {
|
||||
if (stop.get()) {
|
||||
return;
|
||||
}
|
||||
stop.set(true);
|
||||
|
||||
assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
|
||||
assertNoFailures();
|
||||
}
|
||||
|
||||
public long totalIndexedDocs() {
|
||||
return indexCounter.get();
|
||||
}
|
||||
|
||||
public Throwable[] getFailures() {
|
||||
return failures.toArray(new Throwable[failures.size()]);
|
||||
}
|
||||
|
||||
public void assertNoFailures() {
|
||||
assertThat(failures, emptyIterable());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE")
|
||||
@Slow
|
||||
|
@ -64,54 +153,22 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
int numberOfShards = numberOfShards();
|
||||
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicLong indexCounter = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread[] writers = new Thread[5];
|
||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
||||
|
||||
logger.info("--> starting {} indexing threads", writers.length);
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
final int indexerId = i;
|
||||
final Client client = client();
|
||||
writers[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.info("**** starting indexing thread {}", indexerId);
|
||||
while (!stop.get()) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
if (id % 1000 == 0) {
|
||||
client.admin().indices().prepareFlush().execute().actionGet();
|
||||
}
|
||||
client.prepareIndex("test", "type1", Long.toString(id))
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
indexCounter.incrementAndGet();
|
||||
}
|
||||
logger.info("**** done indexing thread {}", indexerId);
|
||||
} catch (Throwable e) {
|
||||
logger.warn("**** failed indexing thread {}", e, indexerId);
|
||||
} finally {
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
writers[i].start();
|
||||
}
|
||||
try {
|
||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||
int waitFor = totalNumDocs / 3;
|
||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
||||
int waitFor = totalNumDocs / 10;
|
||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||
waitForDocs(waitFor);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", waitFor);
|
||||
|
||||
logger.info("--> flushing the index ....");
|
||||
// now flush, just to make sure we have some data in the index, not just translog
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
waitFor += totalNumDocs / 3;
|
||||
waitFor += totalNumDocs / 10;
|
||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||
waitForDocs(waitFor);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", waitFor);
|
||||
|
||||
logger.info("--> allow 2 nodes for index [test] ...");
|
||||
|
@ -124,23 +181,18 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||
waitForDocs(totalNumDocs);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", totalNumDocs);
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
indexer.stop();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
logger.info("--> refreshing the index");
|
||||
refreshAndAssert();
|
||||
logger.info("--> verifying indexed content");
|
||||
iterateAssertCount(numberOfShards, indexCounter.get(), 10);
|
||||
} finally {
|
||||
// verify the workers are shut down
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -151,50 +203,22 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
int numberOfShards = numberOfShards();
|
||||
assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicLong indexCounter = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread[] writers = new Thread[5];
|
||||
logger.info("--> starting {} indexing threads", writers.length);
|
||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
final int indexerId = i;
|
||||
final Client client = client();
|
||||
writers[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.info("**** starting indexing thread {}", indexerId);
|
||||
while (!stop.get()) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
client.prepareIndex("test", "type1", Long.toString(id))
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
indexCounter.incrementAndGet();
|
||||
}
|
||||
logger.info("**** done indexing thread {}", indexerId);
|
||||
} catch (Throwable e) {
|
||||
logger.warn("**** failed indexing thread {}", e, indexerId);
|
||||
} finally {
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
writers[i].start();
|
||||
}
|
||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||
int waitFor = totalNumDocs / 3;
|
||||
try {
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
||||
int waitFor = totalNumDocs / 10;
|
||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||
waitForDocs(waitFor);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", waitFor);
|
||||
|
||||
logger.info("--> flushing the index ....");
|
||||
// now flush, just to make sure we have some data in the index, not just translog
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
waitFor += totalNumDocs / 3;
|
||||
waitFor += totalNumDocs / 10;
|
||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||
waitForDocs(waitFor);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", waitFor);
|
||||
logger.info("--> allow 4 nodes for index [test] ...");
|
||||
allowNodes("test", 4);
|
||||
|
@ -205,24 +229,17 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||
waitForDocs(totalNumDocs);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", totalNumDocs);
|
||||
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
indexer.stop();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
logger.info("--> refreshing the index");
|
||||
refreshAndAssert();
|
||||
logger.info("--> verifying indexed content");
|
||||
iterateAssertCount(numberOfShards, indexCounter.get(), 10);
|
||||
} finally {
|
||||
// make sure the workers are stopped in case of an error
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,53 +251,22 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
int numberOfShards = numberOfShards();
|
||||
assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)));
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicLong indexCounter = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread[] writers = new Thread[5];
|
||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
||||
logger.info("--> starting {} indexing threads", writers.length);
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
final int indexerId = i;
|
||||
final Client client = client();
|
||||
writers[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.info("**** starting indexing thread {}", indexerId);
|
||||
while (!stop.get()) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
client.prepareIndex("test", "type1", Long.toString(id))
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
long count = indexCounter.incrementAndGet();
|
||||
if (count % 1000 == 0) {
|
||||
logger.debug("{} documents indexed", count);
|
||||
}
|
||||
}
|
||||
logger.info("**** done indexing thread {}", indexerId);
|
||||
} catch (Throwable e) {
|
||||
logger.warn("**** failed indexing thread {}", e, indexerId);
|
||||
} finally {
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
writers[i].start();
|
||||
}
|
||||
final int totalNumDocs = scaledRandomIntBetween(200, 20000);
|
||||
int waitFor = totalNumDocs / 3;
|
||||
try {
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
||||
int waitFor = totalNumDocs / 10;
|
||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||
waitForDocs(waitFor);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", waitFor);
|
||||
|
||||
logger.info("--> flushing the index ....");
|
||||
// now flush, just to make sure we have some data in the index, not just translog
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
|
||||
waitFor += totalNumDocs / 3;
|
||||
waitFor += totalNumDocs / 10;
|
||||
logger.info("--> waiting for {} docs to be indexed ...", waitFor);
|
||||
waitForDocs(waitFor);
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> {} docs indexed", waitFor);
|
||||
|
||||
// now start more nodes, while we index
|
||||
|
@ -293,6 +279,8 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
logger.info("--> waiting for {} docs to be indexed ...", totalNumDocs);
|
||||
waitForDocs(totalNumDocs);
|
||||
indexer.assertNoFailures();
|
||||
|
||||
logger.info("--> {} docs indexed", totalNumDocs);
|
||||
// now, shutdown nodes
|
||||
logger.info("--> allow 3 nodes for index [test] ...");
|
||||
|
@ -311,8 +299,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().setWaitForNodes(">=1").execute().actionGet().isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
indexer.stop();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().setWaitForNodes(">=1").execute().actionGet().isTimedOut(), equalTo(false));
|
||||
|
@ -320,11 +307,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
logger.info("--> refreshing the index");
|
||||
refreshAndAssert();
|
||||
logger.info("--> verifying indexed content");
|
||||
iterateAssertCount(numberOfShards, indexCounter.get(), 10);
|
||||
} finally {
|
||||
// make sure the workers are stopped in case of an error
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
iterateAssertCount(numberOfShards, indexer.totalIndexedDocs(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,48 +321,12 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
int allowNodes = 2;
|
||||
assertAcked(prepareCreate("test", 3, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, numReplicas)));
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final AtomicLong indexCounter = new AtomicLong();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread[] writers = new Thread[scaledRandomIntBetween(3, 10)];
|
||||
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
|
||||
logger.info("--> starting {} indexing threads", writers.length);
|
||||
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
final int indexerId = i;
|
||||
final Client client = client();
|
||||
writers[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
long id = -1;
|
||||
try {
|
||||
startLatch.await();
|
||||
logger.info("**** starting indexing thread {}", indexerId);
|
||||
while (!stop.get()) {
|
||||
id = idGenerator.incrementAndGet();
|
||||
client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId)
|
||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||
indexCounter.incrementAndGet();
|
||||
}
|
||||
logger.info("**** done indexing thread {}", indexerId);
|
||||
} catch (Throwable e) {
|
||||
failures.add(e);
|
||||
logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);
|
||||
} finally {
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
writers[i].start();
|
||||
}
|
||||
try {
|
||||
final int numDocs = scaledRandomIntBetween(200, 50000);
|
||||
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer()) {
|
||||
|
||||
final int numDocs = scaledRandomIntBetween(200, 50000);
|
||||
logger.info("--> indexing {} docs in total ...", numDocs);
|
||||
startLatch.countDown();
|
||||
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) {
|
||||
assertThat(failures, emptyIterable());
|
||||
indexer.assertNoFailures();
|
||||
logger.info("--> waiting for {} docs to be indexed ...", i);
|
||||
waitForDocs(i);
|
||||
logger.info("--> {} docs indexed", i);
|
||||
|
@ -390,9 +337,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
assertThat(failures, emptyIterable());
|
||||
indexer.stop();
|
||||
|
||||
logger.info("--> indexing threads stopped");
|
||||
logger.info("--> bump up number of replicas to 1 and allow all nodes to hold the index");
|
||||
|
@ -403,11 +348,7 @@ public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest {
|
|||
logger.info("--> refreshing the index");
|
||||
refreshAndAssert();
|
||||
logger.info("--> verifying indexed content");
|
||||
iterateAssertCount(numShards, indexCounter.get(), 10);
|
||||
} finally {
|
||||
// make sure the workers are stopped in case of an error
|
||||
stop.set(true);
|
||||
stopLatch.await();
|
||||
iterateAssertCount(numShards, indexer.totalIndexedDocs(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue