From 2b42a0f94a794c1fc05bb6c478012a7483a720f1 Mon Sep 17 00:00:00 2001
From: Simon Willnauer
Date: Wed, 4 Dec 2013 14:02:40 +0000
Subject: [PATCH] Override DefaultExceptionHandler to filter out certain
exceptions
We have the situation that some tests fail since they don't handle
EsRejectedExecutionException which gets thrown when a node shuts
down. That is ok to ignore this exception and not fail.
We also suffer from OOMs that can't create native threads but don't
get threaddumps for those failures. This patch prints the thread
stacks once we catch a OOM which can' create native threads.
---
pom.xml | 8 +-
.../common/util/concurrent/EsAbortPolicy.java | 3 +-
.../test/ElasticsearchIntegrationTest.java | 55 +++++++-----
.../test/ElasticsearchTestCase.java | 87 ++++++++++++++++++-
4 files changed, 128 insertions(+), 25 deletions(-)
diff --git a/pom.xml b/pom.xml
index 079d92f29a6..46521465924 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,12 @@
1.3
test
+
+ com.carrotsearch.randomizedtesting
+ randomizedtesting-runner
+ 2.0.15
+ test
+
org.apache.lucene
lucene-test-framework
@@ -323,7 +329,7 @@
com.carrotsearch.randomizedtesting
junit4-maven-plugin
- 2.0.14
+ 2.0.15
tests
diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java
index da86a0f1d08..41e8d62887a 100644
--- a/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java
+++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class EsAbortPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
+ public static final String SHUTTING_DOWN_KEY = "(shutting down)";
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
@@ -51,7 +52,7 @@ public class EsAbortPolicy implements XRejectedExecutionHandler {
rejected.inc();
StringBuilder sb = new StringBuilder("rejected execution ");
if (executor.isShutdown()) {
- sb.append("(shutting down) ");
+ sb.append(SHUTTING_DOWN_KEY + " ");
} else {
if (executor.getQueue() instanceof SizeBlockingQueue) {
sb.append("(queue capacity ").append(((SizeBlockingQueue) executor.getQueue()).capacity()).append(") ");
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
index 4b57df117e3..983504d9a49 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
@@ -20,7 +20,7 @@ package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.google.common.base.Joiner;
-import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
+import org.apache.lucene.util.AbstractRandomizedTest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
@@ -129,7 +129,7 @@ import static org.hamcrest.Matchers.equalTo;
*
*/
@Ignore
-@IntegrationTests
+@AbstractRandomizedTest.IntegrationTests
public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase {
@@ -170,26 +170,34 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
@Before
public final void before() throws IOException {
- final Scope currentClusterScope = getCurrentClusterScope();
- switch (currentClusterScope) {
- case GLOBAL:
- clearClusters();
- currentCluster = GLOBAL_CLUSTER;
- break;
- case SUITE:
- currentCluster = buildAndPutCluster(currentClusterScope, false);
- break;
- case TEST:
- currentCluster = buildAndPutCluster(currentClusterScope, true);
- break;
- default:
- assert false : "Unknown Scope: [" + currentClusterScope + "]";
+ assert Thread.getDefaultUncaughtExceptionHandler() instanceof ElasticsearchUncaughtExceptionHandler;
+ try {
+ final Scope currentClusterScope = getCurrentClusterScope();
+ switch (currentClusterScope) {
+ case GLOBAL:
+ clearClusters();
+ currentCluster = GLOBAL_CLUSTER;
+ break;
+ case SUITE:
+ currentCluster = buildAndPutCluster(currentClusterScope, false);
+ break;
+ case TEST:
+ currentCluster = buildAndPutCluster(currentClusterScope, true);
+ break;
+ default:
+ assert false : "Unknown Scope: [" + currentClusterScope + "]";
+ }
+ currentCluster.beforeTest(getRandom(), getPerTestTransportClientRatio());
+ wipeIndices();
+ wipeTemplates();
+ randomIndexTemplate();
+ logger.info("[{}#{}]: before test", getTestClass().getSimpleName(), getTestName());
+ } catch (OutOfMemoryError e) {
+ if (e.getMessage().contains("unable to create new native thread")) {
+ ElasticsearchTestCase.printStackDump(logger);
+ }
+ throw e;
}
- currentCluster.beforeTest(getRandom(), getPerTestTransportClientRatio());
- wipeIndices();
- wipeTemplates();
- randomIndexTemplate();
- logger.info("[{}#{}]: before test", getTestClass().getSimpleName(), getTestName());
}
public TestCluster buildAndPutCluster(Scope currentClusterScope, boolean createIfExists) throws IOException {
@@ -234,6 +242,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
ensureAllSearchersClosed();
ensureAllFilesClosed();
logger.info("[{}#{}]: cleaned up after test", getTestClass().getSimpleName(), getTestName());
+ } catch (OutOfMemoryError e) {
+ if (e.getMessage().contains("unable to create new native thread")) {
+ ElasticsearchTestCase.printStackDump(logger);
+ }
+ throw e;
} finally {
currentCluster.afterTest();
currentCluster = null;
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
index 71dcac12813..16448a3d417 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
@@ -29,9 +29,12 @@ import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.junit.listeners.LoggingListener;
import org.elasticsearch.test.engine.MockRobinEngine;
import org.elasticsearch.test.store.MockDirectoryHelper;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.Closeable;
@@ -52,6 +55,8 @@ import java.util.concurrent.TimeUnit;
@TimeoutSuite(millis = TimeUnits.HOUR) // timeout the suite after 1h and fail the test.
@Listeners(LoggingListener.class)
public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
+
+ private static Thread.UncaughtExceptionHandler defaultHandler;
protected final ESLogger logger = Loggers.getLogger(getClass());
@@ -168,6 +173,13 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
ensureAllSearchersClosed();
}
});
+ defaultHandler = Thread.getDefaultUncaughtExceptionHandler();
+ Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler(defaultHandler));
+ }
+
+ @AfterClass
+ public static void resetUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(defaultHandler);
}
public static boolean maybeDocValues() {
@@ -215,5 +227,76 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
public static Version randomVersion(Random random) {
return SORTED_VERSIONS.get(random.nextInt(SORTED_VERSIONS.size()));
}
-
- }
+
+ static final class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ private final Thread.UncaughtExceptionHandler parent;
+ private final ESLogger logger = Loggers.getLogger(getClass());
+
+ private ElasticsearchUncaughtExceptionHandler(Thread.UncaughtExceptionHandler parent) {
+ this.parent = parent;
+ }
+
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ if (e instanceof EsRejectedExecutionException) {
+ if (e.getMessage().contains(EsAbortPolicy.SHUTTING_DOWN_KEY)) {
+ return; // ignore the EsRejectedExecutionException when a node shuts down
+ }
+ } else if (e instanceof OutOfMemoryError) {
+ if (e.getMessage().contains("unable to create new native thread")) {
+ printStackDump(logger);
+ }
+ }
+ parent.uncaughtException(t, e);
+ }
+
+ }
+
+ protected static final void printStackDump(ESLogger logger) {
+ // print stack traces if we can't create any native thread anymore
+ Map allStackTraces = Thread.getAllStackTraces();
+ logger.error(formatThreadStacks(allStackTraces));
+ }
+
+ /**
+ * Dump threads and their current stack trace.
+ */
+ private static String formatThreadStacks(Map threads) {
+ StringBuilder message = new StringBuilder();
+ int cnt = 1;
+ final Formatter f = new Formatter(message, Locale.ENGLISH);
+ for (Map.Entry e : threads.entrySet()) {
+ if (e.getKey().isAlive())
+ f.format(Locale.ENGLISH, "\n %2d) %s", cnt++, threadName(e.getKey())).flush();
+ if (e.getValue().length == 0) {
+ message.append("\n at (empty stack)");
+ } else {
+ for (StackTraceElement ste : e.getValue()) {
+ message.append("\n at ").append(ste);
+ }
+ }
+ }
+ return message.toString();
+ }
+
+ private static String threadName(Thread t) {
+ return "Thread[" +
+ "id=" + t.getId() +
+ ", name=" + t.getName() +
+ ", state=" + t.getState() +
+ ", group=" + groupName(t.getThreadGroup()) +
+ "]";
+ }
+
+ private static String groupName(ThreadGroup threadGroup) {
+ if (threadGroup == null) {
+ return "{null group}";
+ } else {
+ return threadGroup.getName();
+ }
+ }
+
+
+}