Override DefaultExceptionHandler to filter out certain exceptions
We have the situation that some tests fail since they don't handle EsRejectedExecutionException which gets thrown when a node shuts down. That is ok to ignore this exception and not fail. We also suffer from OOMs that can't create native threads but don't get threaddumps for those failures. This patch prints the thread stacks once we catch a OOM which can' create native threads.
This commit is contained in:
parent
74bfa27e7e
commit
2b42a0f94a
8
pom.xml
8
pom.xml
|
@ -53,6 +53,12 @@
|
|||
<version>1.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.carrotsearch.randomizedtesting</groupId>
|
||||
<artifactId>randomizedtesting-runner</artifactId>
|
||||
<version>2.0.15</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.lucene</groupId>
|
||||
<artifactId>lucene-test-framework</artifactId>
|
||||
|
@ -323,7 +329,7 @@
|
|||
<plugin>
|
||||
<groupId>com.carrotsearch.randomizedtesting</groupId>
|
||||
<artifactId>junit4-maven-plugin</artifactId>
|
||||
<version>2.0.14</version>
|
||||
<version>2.0.15</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>tests</id>
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
public class EsAbortPolicy implements XRejectedExecutionHandler {
|
||||
|
||||
private final CounterMetric rejected = new CounterMetric();
|
||||
public static final String SHUTTING_DOWN_KEY = "(shutting down)";
|
||||
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
|
@ -51,7 +52,7 @@ public class EsAbortPolicy implements XRejectedExecutionHandler {
|
|||
rejected.inc();
|
||||
StringBuilder sb = new StringBuilder("rejected execution ");
|
||||
if (executor.isShutdown()) {
|
||||
sb.append("(shutting down) ");
|
||||
sb.append(SHUTTING_DOWN_KEY + " ");
|
||||
} else {
|
||||
if (executor.getQueue() instanceof SizeBlockingQueue) {
|
||||
sb.append("(queue capacity ").append(((SizeBlockingQueue) executor.getQueue()).capacity()).append(") ");
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.elasticsearch.test;
|
|||
|
||||
import com.carrotsearch.randomizedtesting.SeedUtils;
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
|
||||
import org.apache.lucene.util.AbstractRandomizedTest;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
|
@ -129,7 +129,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
* </p>
|
||||
*/
|
||||
@Ignore
|
||||
@IntegrationTests
|
||||
@AbstractRandomizedTest.IntegrationTests
|
||||
public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase {
|
||||
|
||||
|
||||
|
@ -170,26 +170,34 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
|
||||
@Before
|
||||
public final void before() throws IOException {
|
||||
final Scope currentClusterScope = getCurrentClusterScope();
|
||||
switch (currentClusterScope) {
|
||||
case GLOBAL:
|
||||
clearClusters();
|
||||
currentCluster = GLOBAL_CLUSTER;
|
||||
break;
|
||||
case SUITE:
|
||||
currentCluster = buildAndPutCluster(currentClusterScope, false);
|
||||
break;
|
||||
case TEST:
|
||||
currentCluster = buildAndPutCluster(currentClusterScope, true);
|
||||
break;
|
||||
default:
|
||||
assert false : "Unknown Scope: [" + currentClusterScope + "]";
|
||||
assert Thread.getDefaultUncaughtExceptionHandler() instanceof ElasticsearchUncaughtExceptionHandler;
|
||||
try {
|
||||
final Scope currentClusterScope = getCurrentClusterScope();
|
||||
switch (currentClusterScope) {
|
||||
case GLOBAL:
|
||||
clearClusters();
|
||||
currentCluster = GLOBAL_CLUSTER;
|
||||
break;
|
||||
case SUITE:
|
||||
currentCluster = buildAndPutCluster(currentClusterScope, false);
|
||||
break;
|
||||
case TEST:
|
||||
currentCluster = buildAndPutCluster(currentClusterScope, true);
|
||||
break;
|
||||
default:
|
||||
assert false : "Unknown Scope: [" + currentClusterScope + "]";
|
||||
}
|
||||
currentCluster.beforeTest(getRandom(), getPerTestTransportClientRatio());
|
||||
wipeIndices();
|
||||
wipeTemplates();
|
||||
randomIndexTemplate();
|
||||
logger.info("[{}#{}]: before test", getTestClass().getSimpleName(), getTestName());
|
||||
} catch (OutOfMemoryError e) {
|
||||
if (e.getMessage().contains("unable to create new native thread")) {
|
||||
ElasticsearchTestCase.printStackDump(logger);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
currentCluster.beforeTest(getRandom(), getPerTestTransportClientRatio());
|
||||
wipeIndices();
|
||||
wipeTemplates();
|
||||
randomIndexTemplate();
|
||||
logger.info("[{}#{}]: before test", getTestClass().getSimpleName(), getTestName());
|
||||
}
|
||||
|
||||
public TestCluster buildAndPutCluster(Scope currentClusterScope, boolean createIfExists) throws IOException {
|
||||
|
@ -234,6 +242,11 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
ensureAllSearchersClosed();
|
||||
ensureAllFilesClosed();
|
||||
logger.info("[{}#{}]: cleaned up after test", getTestClass().getSimpleName(), getTestName());
|
||||
} catch (OutOfMemoryError e) {
|
||||
if (e.getMessage().contains("unable to create new native thread")) {
|
||||
ElasticsearchTestCase.printStackDump(logger);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
currentCluster.afterTest();
|
||||
currentCluster = null;
|
||||
|
|
|
@ -29,9 +29,12 @@ import org.apache.lucene.util.TimeUnits;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.test.junit.listeners.LoggingListener;
|
||||
import org.elasticsearch.test.engine.MockRobinEngine;
|
||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -52,6 +55,8 @@ import java.util.concurrent.TimeUnit;
|
|||
@TimeoutSuite(millis = TimeUnits.HOUR) // timeout the suite after 1h and fail the test.
|
||||
@Listeners(LoggingListener.class)
|
||||
public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
|
||||
|
||||
private static Thread.UncaughtExceptionHandler defaultHandler;
|
||||
|
||||
protected final ESLogger logger = Loggers.getLogger(getClass());
|
||||
|
||||
|
@ -168,6 +173,13 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
|
|||
ensureAllSearchersClosed();
|
||||
}
|
||||
});
|
||||
defaultHandler = Thread.getDefaultUncaughtExceptionHandler();
|
||||
Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler(defaultHandler));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void resetUncaughtExceptionHandler() {
|
||||
Thread.setDefaultUncaughtExceptionHandler(defaultHandler);
|
||||
}
|
||||
|
||||
public static boolean maybeDocValues() {
|
||||
|
@ -215,5 +227,76 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
|
|||
public static Version randomVersion(Random random) {
|
||||
return SORTED_VERSIONS.get(random.nextInt(SORTED_VERSIONS.size()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static final class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
|
||||
|
||||
private final Thread.UncaughtExceptionHandler parent;
|
||||
private final ESLogger logger = Loggers.getLogger(getClass());
|
||||
|
||||
private ElasticsearchUncaughtExceptionHandler(Thread.UncaughtExceptionHandler parent) {
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void uncaughtException(Thread t, Throwable e) {
|
||||
if (e instanceof EsRejectedExecutionException) {
|
||||
if (e.getMessage().contains(EsAbortPolicy.SHUTTING_DOWN_KEY)) {
|
||||
return; // ignore the EsRejectedExecutionException when a node shuts down
|
||||
}
|
||||
} else if (e instanceof OutOfMemoryError) {
|
||||
if (e.getMessage().contains("unable to create new native thread")) {
|
||||
printStackDump(logger);
|
||||
}
|
||||
}
|
||||
parent.uncaughtException(t, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected static final void printStackDump(ESLogger logger) {
|
||||
// print stack traces if we can't create any native thread anymore
|
||||
Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
|
||||
logger.error(formatThreadStacks(allStackTraces));
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump threads and their current stack trace.
|
||||
*/
|
||||
private static String formatThreadStacks(Map<Thread,StackTraceElement[]> threads) {
|
||||
StringBuilder message = new StringBuilder();
|
||||
int cnt = 1;
|
||||
final Formatter f = new Formatter(message, Locale.ENGLISH);
|
||||
for (Map.Entry<Thread,StackTraceElement[]> e : threads.entrySet()) {
|
||||
if (e.getKey().isAlive())
|
||||
f.format(Locale.ENGLISH, "\n %2d) %s", cnt++, threadName(e.getKey())).flush();
|
||||
if (e.getValue().length == 0) {
|
||||
message.append("\n at (empty stack)");
|
||||
} else {
|
||||
for (StackTraceElement ste : e.getValue()) {
|
||||
message.append("\n at ").append(ste);
|
||||
}
|
||||
}
|
||||
}
|
||||
return message.toString();
|
||||
}
|
||||
|
||||
private static String threadName(Thread t) {
|
||||
return "Thread[" +
|
||||
"id=" + t.getId() +
|
||||
", name=" + t.getName() +
|
||||
", state=" + t.getState() +
|
||||
", group=" + groupName(t.getThreadGroup()) +
|
||||
"]";
|
||||
}
|
||||
|
||||
private static String groupName(ThreadGroup threadGroup) {
|
||||
if (threadGroup == null) {
|
||||
return "{null group}";
|
||||
} else {
|
||||
return threadGroup.getName();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue