Improve efficiency of background indexer by allowing to add an assertion for failures while they are produced to prevent queuing them up. Also, add non-blocking stop to the background indexer so that when stopping multiple indexers we don't needlessly continue indexing on some indexers while stopping another one. Closes #57766
This commit is contained in:
parent
6c93fed204
commit
619e4f8c02
|
@ -137,7 +137,7 @@ public class DiskDisruptionIT extends AbstractDisruptionTestCase {
|
|||
false, random())) {
|
||||
indexer.setRequestTimeout(TimeValue.ZERO);
|
||||
indexer.setIgnoreIndexingFailures(true);
|
||||
indexer.setAssertNoFailuresOnStop(false);
|
||||
indexer.setFailureAssertion(e -> {});
|
||||
indexer.start(-1);
|
||||
|
||||
waitForDocs(randomIntBetween(1, 100), indexer);
|
||||
|
|
|
@ -201,19 +201,12 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
|
||||
int nbDocs = 0;
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS)) {
|
||||
indexer.setAssertNoFailuresOnStop(false);
|
||||
indexer.setFailureAssertion(t -> assertException(t, indexName));
|
||||
|
||||
waitForDocs(randomIntBetween(10, 50), indexer);
|
||||
assertBusy(() -> closeIndices(indexName));
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
nbDocs += indexer.totalIndexedDocs();
|
||||
|
||||
final Throwable[] failures = indexer.getFailures();
|
||||
if (failures != null) {
|
||||
for (Throwable failure : failures) {
|
||||
assertException(failure, indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertIndexIsClosed(indexName);
|
||||
|
@ -280,6 +273,7 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
createIndex(indexName);
|
||||
|
||||
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS);
|
||||
indexer.setFailureAssertion(e -> {});
|
||||
waitForDocs(1, indexer);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -321,8 +315,7 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
thread.join();
|
||||
}
|
||||
|
||||
indexer.setAssertNoFailuresOnStop(false);
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
if (clusterState.metadata().indices().get(indexName).getState() == IndexMetadata.State.CLOSE) {
|
||||
|
|
|
@ -112,6 +112,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
logger.debug("creating index {} with background indexing", indexName);
|
||||
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1);
|
||||
indexers.put(indexName, indexer);
|
||||
indexer.setFailureAssertion(t -> assertException(t, indexName));
|
||||
waitForDocs(1, indexer);
|
||||
}
|
||||
docsPerIndex.put(indexName, (long) nbDocs);
|
||||
|
@ -225,20 +226,15 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
|||
thread.join();
|
||||
}
|
||||
|
||||
// stop indexers first without waiting for stop to not redundantly index on some while waiting for another one to stop
|
||||
for (BackgroundIndexer indexer : indexers.values()) {
|
||||
indexer.stop();
|
||||
}
|
||||
for (Map.Entry<String, BackgroundIndexer> entry : indexers.entrySet()) {
|
||||
final BackgroundIndexer indexer = entry.getValue();
|
||||
indexer.setAssertNoFailuresOnStop(false);
|
||||
indexer.stop();
|
||||
|
||||
indexer.awaitStopped();
|
||||
final String indexName = entry.getKey();
|
||||
docsPerIndex.computeIfPresent(indexName, (key, value) -> value + indexer.totalIndexedDocs());
|
||||
|
||||
final Throwable[] failures = indexer.getFailures();
|
||||
if (failures != null) {
|
||||
for (Throwable failure : failures) {
|
||||
assertException(failure, indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (String index : indices) {
|
||||
|
|
|
@ -128,7 +128,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|||
logger.info("--> {} docs indexed", totalNumDocs);
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
logger.info("--> refreshing the index");
|
||||
|
@ -183,7 +183,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|||
logger.info("--> {} docs indexed", totalNumDocs);
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
logger.info("--> refreshing the index");
|
||||
|
@ -261,7 +261,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|||
.setWaitForNoRelocatingShards(true));
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth()
|
||||
|
@ -302,7 +302,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
logger.info("--> marking and waiting for indexing threads to stop ...");
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
|
||||
logger.info("--> indexing threads stopped");
|
||||
logger.info("--> bump up number of replicas to 1 and allow all nodes to hold the index");
|
||||
|
|
|
@ -236,7 +236,7 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
}
|
||||
logger.info("--> done relocations");
|
||||
logger.info("--> waiting for indexing threads to stop ...");
|
||||
indexer.stop();
|
||||
indexer.stopAndAwaitStopped();
|
||||
logger.info("--> indexing threads stopped");
|
||||
|
||||
logger.info("--> refreshing the index");
|
||||
|
|
|
@ -39,14 +39,16 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.emptyIterable;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -58,7 +60,7 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
final Thread[] writers;
|
||||
final Client client;
|
||||
final CountDownLatch stopLatch;
|
||||
final CopyOnWriteArrayList<Exception> failures;
|
||||
final Collection<Exception> failures = new ArrayList<>();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
final AtomicLong idGenerator = new AtomicLong();
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
|
@ -66,7 +68,7 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
final Semaphore availableBudget = new Semaphore(0);
|
||||
final boolean useAutoGeneratedIDs;
|
||||
private final Set<String> ids = ConcurrentCollections.newConcurrentSet();
|
||||
private boolean assertNoFailuresOnStop = true;
|
||||
private volatile Consumer<Exception> failureAssertion = null;
|
||||
|
||||
volatile int minFieldSize = 10;
|
||||
volatile int maxFieldSize = 140;
|
||||
|
@ -118,7 +120,6 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
}
|
||||
this.client = client;
|
||||
useAutoGeneratedIDs = random.nextBoolean();
|
||||
failures = new CopyOnWriteArrayList<>();
|
||||
writers = new Thread[writerCount];
|
||||
stopLatch = new CountDownLatch(writers.length);
|
||||
logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs);
|
||||
|
@ -162,7 +163,7 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
boolean add = ids.add(bulkItemResponse.getId());
|
||||
assert add : "ID: " + bulkItemResponse.getId() + " already used";
|
||||
} else {
|
||||
failures.add(bulkItemResponse.getFailure().getCause());
|
||||
trackFailure(bulkItemResponse.getFailure().getCause());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -204,7 +205,7 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
}
|
||||
logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), ids.size());
|
||||
} catch (Exception e) {
|
||||
failures.add(e);
|
||||
trackFailure(e);
|
||||
final long docId = id;
|
||||
logger.warn(
|
||||
(Supplier<?>)
|
||||
|
@ -222,6 +223,16 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
private void trackFailure(Exception e) {
|
||||
synchronized (failures) {
|
||||
if (failureAssertion != null) {
|
||||
failureAssertion.accept(e);
|
||||
} else {
|
||||
failures.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private XContentBuilder generateSource(long id, Random random) throws IOException {
|
||||
int contentLength = RandomNumbers.randomIntBetween(random, minFieldSize, maxFieldSize);
|
||||
StringBuilder text = new StringBuilder(contentLength);
|
||||
|
@ -287,32 +298,55 @@ public class BackgroundIndexer implements AutoCloseable {
|
|||
setBudget(numOfDocs);
|
||||
}
|
||||
|
||||
/** Stop all background threads * */
|
||||
public void stop() throws InterruptedException {
|
||||
if (stop.get()) {
|
||||
return;
|
||||
}
|
||||
/** Stop all background threads but don't wait for ongoing indexing operations to finish * */
|
||||
public void stop() {
|
||||
stop.set(true);
|
||||
}
|
||||
|
||||
public void awaitStopped() throws InterruptedException {
|
||||
assert stop.get();
|
||||
Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
|
||||
if (assertNoFailuresOnStop) {
|
||||
if (failureAssertion == null) {
|
||||
assertNoFailures();
|
||||
}
|
||||
}
|
||||
|
||||
/** Stop all background threads and wait for ongoing indexing operations to finish * */
|
||||
public void stopAndAwaitStopped() throws InterruptedException {
|
||||
stop();
|
||||
awaitStopped();
|
||||
}
|
||||
|
||||
public long totalIndexedDocs() {
|
||||
return ids.size();
|
||||
}
|
||||
|
||||
public Throwable[] getFailures() {
|
||||
return failures.toArray(new Throwable[failures.size()]);
|
||||
}
|
||||
|
||||
public void assertNoFailures() {
|
||||
synchronized (failures) {
|
||||
Assert.assertThat(failures, emptyIterable());
|
||||
}
|
||||
}
|
||||
|
||||
public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) {
|
||||
this.assertNoFailuresOnStop = assertNoFailuresOnStop;
|
||||
/**
|
||||
* Set a consumer that can be used to run assertions on failures during indexing. If such a consumer is set then it disables adding
|
||||
* failures to {@link #failures}. Should be used if the number of expected failures during indexing could become very large.
|
||||
*/
|
||||
public void setFailureAssertion(Consumer<Exception> failureAssertion) {
|
||||
synchronized (failures) {
|
||||
this.failureAssertion = failureAssertion;
|
||||
boolean success = false;
|
||||
try {
|
||||
for (Exception failure : failures) {
|
||||
failureAssertion.accept(failure);
|
||||
}
|
||||
failures.clear();
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue