Fix ClusterDisruptionIT#testAckedIndexing (#37853)

* Stop threads before logging the list of exceptions
* For the broken case of concurrent iteration in the finally block and the threads not having shut down,
use `CopyOnWriteArrayList` to have concurrency safe iteration
* Closes #37810
This commit is contained in:
Armin Braun 2019-01-25 09:38:29 +01:00 committed by GitHub
parent 5a9dadb3ff
commit 7692b607b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 8 deletions

View File

@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -84,7 +85,6 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
"org.elasticsearch.discovery:TRACE,org.elasticsearch.action.support.replication:TRACE," +
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37810")
public void testAckedIndexing() throws Exception {
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;
@ -109,7 +109,7 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
List<Semaphore> semaphores = new ArrayList<>(nodes.size());
final AtomicInteger idGenerator = new AtomicInteger(0);
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
final List<Exception> exceptedExceptions = Collections.synchronizedList(new ArrayList<Exception>());
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();
logger.info("starting indexers");
try {
@ -215,6 +215,12 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
logger.info("done validating (iteration [{}])", iter);
}
} finally {
logger.info("shutting down indexers");
stop.set(true);
for (Thread indexer : indexers) {
indexer.interrupt();
indexer.join(60000);
}
if (exceptedExceptions.size() > 0) {
StringBuilder sb = new StringBuilder();
for (Exception e : exceptedExceptions) {
@ -222,12 +228,6 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
}
logger.debug("Indexing exceptions during disruption: {}", sb);
}
logger.info("shutting down indexers");
stop.set(true);
for (Thread indexer : indexers) {
indexer.interrupt();
indexer.join(60000);
}
}
}