From 7692b607b930f632d6ea5e5e0eda55fcb57cfc45 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 25 Jan 2019 09:38:29 +0100 Subject: [PATCH] Fix ClusterDisruptionIT#testAckedIndexing (#37853) * Stop threads before logging the list of exceptions * For the broken case of concurrent iteration in the finally block and the threads not having shut down, use `CopyOnWriteArrayList` to have concurrency safe iteration * Closes #37810 --- .../discovery/ClusterDisruptionIT.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index f1e78fd3c6a..d94c34c7b33 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -84,7 +85,6 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { "org.elasticsearch.discovery:TRACE,org.elasticsearch.action.support.replication:TRACE," + "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," + "org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE") - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37810") public void testAckedIndexing() throws Exception { final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; @@ -109,7 +109,7 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { List semaphores = new ArrayList<>(nodes.size()); final AtomicInteger idGenerator = new AtomicInteger(0); final AtomicReference countDownLatchRef = new AtomicReference<>(); - final List exceptedExceptions = Collections.synchronizedList(new ArrayList()); + final List exceptedExceptions = new CopyOnWriteArrayList<>(); logger.info("starting indexers"); try { @@ -215,6 +215,12 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { logger.info("done validating (iteration [{}])", iter); } } finally { + logger.info("shutting down indexers"); + stop.set(true); + for (Thread indexer : indexers) { + indexer.interrupt(); + indexer.join(60000); + } if (exceptedExceptions.size() > 0) { StringBuilder sb = new StringBuilder(); for (Exception e : exceptedExceptions) { @@ -222,12 +228,6 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { } logger.debug("Indexing exceptions during disruption: {}", sb); } - logger.info("shutting down indexers"); - stop.set(true); - for (Thread indexer : indexers) { - indexer.interrupt(); - indexer.join(60000); - } } }