From 619e4f8c020ca84837e0ae729211d4af934bf847 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 8 Jun 2020 10:18:47 +0200 Subject: [PATCH] Make BackgroundIndexer more Efficient (#57781) (#57789) Improve efficiency of background indexer by allowing to add an assertion for failures while they are produced to prevent queuing them up. Also, add non-blocking stop to the background indexer so that when stopping multiple indexers we don't needlessly continue indexing on some indexers while stopping another one. Closes #57766 --- .../discovery/DiskDisruptionIT.java | 2 +- .../indices/state/CloseIndexIT.java | 15 ++-- .../state/CloseWhileRelocatingShardsIT.java | 16 ++--- .../recovery/RecoveryWhileUnderLoadIT.java | 8 +-- .../elasticsearch/recovery/RelocationIT.java | 2 +- .../elasticsearch/test/BackgroundIndexer.java | 72 ++++++++++++++----- 6 files changed, 69 insertions(+), 46 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiskDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiskDisruptionIT.java index 235272201d6..cb3772b3465 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiskDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/DiskDisruptionIT.java @@ -137,7 +137,7 @@ public class DiskDisruptionIT extends AbstractDisruptionTestCase { false, random())) { indexer.setRequestTimeout(TimeValue.ZERO); indexer.setIgnoreIndexingFailures(true); - indexer.setAssertNoFailuresOnStop(false); + indexer.setFailureAssertion(e -> {}); indexer.start(-1); waitForDocs(randomIntBetween(1, 100), indexer); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java index c5c40f0990c..5ae4145e295 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -201,19 +201,12 @@ public class CloseIndexIT extends ESIntegTestCase { int nbDocs = 0; try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS)) { - indexer.setAssertNoFailuresOnStop(false); + indexer.setFailureAssertion(t -> assertException(t, indexName)); waitForDocs(randomIntBetween(10, 50), indexer); assertBusy(() -> closeIndices(indexName)); - indexer.stop(); + indexer.stopAndAwaitStopped(); nbDocs += indexer.totalIndexedDocs(); - - final Throwable[] failures = indexer.getFailures(); - if (failures != null) { - for (Throwable failure : failures) { - assertException(failure, indexName); - } - } } assertIndexIsClosed(indexName); @@ -280,6 +273,7 @@ public class CloseIndexIT extends ESIntegTestCase { createIndex(indexName); final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS); + indexer.setFailureAssertion(e -> {}); waitForDocs(1, indexer); final CountDownLatch latch = new CountDownLatch(1); @@ -321,8 +315,7 @@ public class CloseIndexIT extends ESIntegTestCase { thread.join(); } - indexer.setAssertNoFailuresOnStop(false); - indexer.stop(); + indexer.stopAndAwaitStopped(); final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); if (clusterState.metadata().indices().get(indexName).getState() == IndexMetadata.State.CLOSE) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java index c7ab0d31564..7ef953988a9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java @@ -112,6 +112,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { logger.debug("creating index {} with background indexing", indexName); final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1); indexers.put(indexName, indexer); + indexer.setFailureAssertion(t -> assertException(t, indexName)); waitForDocs(1, indexer); } docsPerIndex.put(indexName, (long) nbDocs); @@ -225,20 +226,15 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { thread.join(); } + // stop indexers first without waiting for stop to not redundantly index on some while waiting for another one to stop + for (BackgroundIndexer indexer : indexers.values()) { + indexer.stop(); + } for (Map.Entry entry : indexers.entrySet()) { final BackgroundIndexer indexer = entry.getValue(); - indexer.setAssertNoFailuresOnStop(false); - indexer.stop(); - + indexer.awaitStopped(); final String indexName = entry.getKey(); docsPerIndex.computeIfPresent(indexName, (key, value) -> value + indexer.totalIndexedDocs()); - - final Throwable[] failures = indexer.getFailures(); - if (failures != null) { - for (Throwable failure : failures) { - assertException(failure, indexName); - } - } } for (String index : indices) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index d673c26387c..9f66dc63aa8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -128,7 +128,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { logger.info("--> {} docs indexed", totalNumDocs); logger.info("--> marking and waiting for indexing threads to stop ..."); - indexer.stop(); + indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); @@ -183,7 +183,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { logger.info("--> {} docs indexed", totalNumDocs); logger.info("--> marking and waiting for indexing threads to stop ..."); - indexer.stop(); + indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); @@ -261,7 +261,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { .setWaitForNoRelocatingShards(true)); logger.info("--> marking and waiting for indexing threads to stop ..."); - indexer.stop(); + indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); assertNoTimeout(client().admin().cluster().prepareHealth() @@ -302,7 +302,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { } logger.info("--> marking and waiting for indexing threads to stop ..."); - indexer.stop(); + indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); logger.info("--> bump up number of replicas to 1 and allow all nodes to hold the index"); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 8dbcb433b55..b6ef20681ff 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -236,7 +236,7 @@ public class RelocationIT extends ESIntegTestCase { } logger.info("--> done relocations"); logger.info("--> waiting for indexing threads to stop ..."); - indexer.stop(); + indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); diff --git a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java index e1fa74c5b31..3bedd105edc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java @@ -39,14 +39,16 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.junit.Assert; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Random; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; @@ -58,7 +60,7 @@ public class BackgroundIndexer implements AutoCloseable { final Thread[] writers; final Client client; final CountDownLatch stopLatch; - final CopyOnWriteArrayList failures; + final Collection failures = new ArrayList<>(); final AtomicBoolean stop = new AtomicBoolean(false); final AtomicLong idGenerator = new AtomicLong(); final CountDownLatch startLatch = new CountDownLatch(1); @@ -66,7 +68,7 @@ public class BackgroundIndexer implements AutoCloseable { final Semaphore availableBudget = new Semaphore(0); final boolean useAutoGeneratedIDs; private final Set ids = ConcurrentCollections.newConcurrentSet(); - private boolean assertNoFailuresOnStop = true; + private volatile Consumer failureAssertion = null; volatile int minFieldSize = 10; volatile int maxFieldSize = 140; @@ -118,7 +120,6 @@ public class BackgroundIndexer implements AutoCloseable { } this.client = client; useAutoGeneratedIDs = random.nextBoolean(); - failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; stopLatch = new CountDownLatch(writers.length); logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs); @@ -162,7 +163,7 @@ public class BackgroundIndexer implements AutoCloseable { boolean add = ids.add(bulkItemResponse.getId()); assert add : "ID: " + bulkItemResponse.getId() + " already used"; } else { - failures.add(bulkItemResponse.getFailure().getCause()); + trackFailure(bulkItemResponse.getFailure().getCause()); } } } catch (Exception e) { @@ -204,7 +205,7 @@ public class BackgroundIndexer implements AutoCloseable { } logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), ids.size()); } catch (Exception e) { - failures.add(e); + trackFailure(e); final long docId = id; logger.warn( (Supplier) @@ -222,6 +223,16 @@ public class BackgroundIndexer implements AutoCloseable { } } + private void trackFailure(Exception e) { + synchronized (failures) { + if (failureAssertion != null) { + failureAssertion.accept(e); + } else { + failures.add(e); + } + } + } + private XContentBuilder generateSource(long id, Random random) throws IOException { int contentLength = RandomNumbers.randomIntBetween(random, minFieldSize, maxFieldSize); StringBuilder text = new StringBuilder(contentLength); @@ -287,32 +298,55 @@ public class BackgroundIndexer implements AutoCloseable { setBudget(numOfDocs); } - /** Stop all background threads * */ - public void stop() throws InterruptedException { - if (stop.get()) { - return; - } + /** Stop all background threads but don't wait for ongoing indexing operations to finish * */ + public void stop() { stop.set(true); + } + + public void awaitStopped() throws InterruptedException { + assert stop.get(); Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); - if (assertNoFailuresOnStop) { + if (failureAssertion == null) { assertNoFailures(); } } + /** Stop all background threads and wait for ongoing indexing operations to finish * */ + public void stopAndAwaitStopped() throws InterruptedException { + stop(); + awaitStopped(); + } + public long totalIndexedDocs() { return ids.size(); } - public Throwable[] getFailures() { - return failures.toArray(new Throwable[failures.size()]); - } - public void assertNoFailures() { - Assert.assertThat(failures, emptyIterable()); + synchronized (failures) { + Assert.assertThat(failures, emptyIterable()); + } } - public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) { - this.assertNoFailuresOnStop = assertNoFailuresOnStop; + /** + * Set a consumer that can be used to run assertions on failures during indexing. If such a consumer is set then it disables adding + * failures to {@link #failures}. Should be used if the number of expected failures during indexing could become very large. + */ + public void setFailureAssertion(Consumer failureAssertion) { + synchronized (failures) { + this.failureAssertion = failureAssertion; + boolean success = false; + try { + for (Exception failure : failures) { + failureAssertion.accept(failure); + } + failures.clear(); + success = true; + } finally { + if (success == false) { + stop(); + } + } + } } @Override