Protect scheduler engine against throwing listeners (#32998)
There are two problems with the scheduler engine today. Both relate to listeners that throw. The first problem is that any triggered listener that throws a plain old exception will cause no additional listeners to be triggered for the event, and will also cause the scheduler to never be invoked again. This leads to lost events and is bad. The second problem is that any triggered listener that throws an error of the fatal kind will not lead to that error because caught by the uncaught exception handler. This is because the triggered listener is executed as a future task under a scheduled thread pool executor. A throwable there goes caught by the JDK framework and set as the outcome on the future task. Since we never inspect these tasks for their outcomes, nor is there a good place to do this, we have to handle these errors ourselves. To do this, we catch them and dispatch them to the uncaught exception handler via a forked thread. This is similar to our handling in Netty.
This commit is contained in:
parent
cd83ddcecc
commit
ad0a965db9
|
@ -250,13 +250,13 @@ public final class ExceptionsHelper {
|
|||
* @param throwable the throwable to test
|
||||
*/
|
||||
public static void dieOnError(Throwable throwable) {
|
||||
final Optional<Error> maybeError = ExceptionsHelper.maybeError(throwable, logger);
|
||||
if (maybeError.isPresent()) {
|
||||
ExceptionsHelper.maybeError(throwable, logger).ifPresent(error -> {
|
||||
/*
|
||||
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many
|
||||
* invocations of user-code in try/catch blocks that swallow all throwables. This means that a rethrow here will not bubble up
|
||||
* to where we want it to. So, we fork a thread and throw the exception from there where Netty can not get to it. We do not wrap
|
||||
* the exception so as to not lose the original cause during exit.
|
||||
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, sometimes the stack
|
||||
* contains statements that catch any throwable (e.g., Netty, and the JDK futures framework). This means that a rethrow here
|
||||
* will not bubble up to where we want it to. So, we fork a thread and throw the exception from there where we are sure the
|
||||
* stack does not contain statements that catch any throwable. We do not wrap the exception so as to not lose the original cause
|
||||
* during exit.
|
||||
*/
|
||||
try {
|
||||
// try to log the current stack trace
|
||||
|
@ -264,12 +264,12 @@ public final class ExceptionsHelper {
|
|||
logger.error("fatal error\n{}", formatted);
|
||||
} finally {
|
||||
new Thread(
|
||||
() -> {
|
||||
throw maybeError.get();
|
||||
})
|
||||
.start();
|
||||
() -> {
|
||||
throw error;
|
||||
})
|
||||
.start();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -3,8 +3,13 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.scheduler;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
|
@ -14,6 +19,7 @@ import java.time.Clock;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -89,13 +95,20 @@ public class SchedulerEngine {
|
|||
}
|
||||
|
||||
private final Map<String, ActiveSchedule> schedules = ConcurrentCollections.newConcurrentMap();
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private final Clock clock;
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private final Logger logger;
|
||||
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
public SchedulerEngine(Settings settings, Clock clock) {
|
||||
this.clock = clock;
|
||||
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler"));
|
||||
public SchedulerEngine(final Settings settings, final Clock clock) {
|
||||
this(settings, clock, LogManager.getLogger(SchedulerEngine.class));
|
||||
}
|
||||
|
||||
SchedulerEngine(final Settings settings, final Clock clock, final Logger logger) {
|
||||
this.clock = Objects.requireNonNull(clock, "clock");
|
||||
this.scheduler = Executors.newScheduledThreadPool(
|
||||
1, EsExecutors.daemonThreadFactory(Objects.requireNonNull(settings, "settings"), "trigger_engine_scheduler"));
|
||||
this.logger = Objects.requireNonNull(logger, "logger");
|
||||
}
|
||||
|
||||
public void register(Listener listener) {
|
||||
|
@ -144,10 +157,15 @@ public class SchedulerEngine {
|
|||
return schedules.size();
|
||||
}
|
||||
|
||||
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
|
||||
protected void notifyListeners(final String name, final long triggeredTime, final long scheduledTime) {
|
||||
final Event event = new Event(name, triggeredTime, scheduledTime);
|
||||
for (Listener listener : listeners) {
|
||||
listener.triggered(event);
|
||||
for (final Listener listener : listeners) {
|
||||
try {
|
||||
listener.triggered(event);
|
||||
} catch (final Exception e) {
|
||||
// do not allow exceptions to escape this method; we should continue to notify listeners and schedule the next run
|
||||
logger.warn(new ParameterizedMessage("listener failed while handling triggered event [{}]", name), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,8 +187,20 @@ public class SchedulerEngine {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
long triggeredTime = clock.millis();
|
||||
notifyListeners(name, triggeredTime, scheduledTime);
|
||||
final long triggeredTime = clock.millis();
|
||||
try {
|
||||
notifyListeners(name, triggeredTime, scheduledTime);
|
||||
} catch (final Throwable t) {
|
||||
/*
|
||||
* Allowing the throwable to escape here will lead to be it being caught in FutureTask#run and set as the outcome of this
|
||||
* task; however, we never inspect the the outcomes of these scheduled tasks and so allowing the throwable to escape
|
||||
* unhandled here could lead to use losing fatal errors. Instead, we rely on ExceptionsHelper#dieOnError to appropriately
|
||||
* dispatch any error to the uncaught exception handler. We should never see an exception here as these do not escape from
|
||||
* SchedulerEngine#notifyListeners.
|
||||
*/
|
||||
ExceptionsHelper.dieOnError(t);
|
||||
throw t;
|
||||
}
|
||||
scheduleNextRun(triggeredTime);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.scheduler;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
public class SchedulerEngineTests extends ESTestCase {
|
||||
|
||||
public void testListenersThrowingExceptionsDoNotCauseOtherListenersToBeSkipped() throws InterruptedException {
|
||||
final Logger mockLogger = mock(Logger.class);
|
||||
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger);
|
||||
try {
|
||||
final List<Tuple<SchedulerEngine.Listener, AtomicBoolean>> listeners = new ArrayList<>();
|
||||
final int numberOfListeners = randomIntBetween(1, 32);
|
||||
int numberOfFailingListeners = 0;
|
||||
final CountDownLatch latch = new CountDownLatch(numberOfListeners);
|
||||
for (int i = 0; i < numberOfListeners; i++) {
|
||||
final AtomicBoolean trigger = new AtomicBoolean();
|
||||
final SchedulerEngine.Listener listener;
|
||||
if (randomBoolean()) {
|
||||
listener = event -> {
|
||||
if (trigger.compareAndSet(false, true)) {
|
||||
latch.countDown();
|
||||
} else {
|
||||
fail("listener invoked twice");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
numberOfFailingListeners++;
|
||||
listener = event -> {
|
||||
if (trigger.compareAndSet(false, true)) {
|
||||
latch.countDown();
|
||||
throw new RuntimeException(getTestName());
|
||||
} else {
|
||||
fail("listener invoked twice");
|
||||
}
|
||||
};
|
||||
}
|
||||
listeners.add(Tuple.tuple(listener, trigger));
|
||||
}
|
||||
|
||||
// randomize the order and register the listeners
|
||||
Collections.shuffle(listeners, random());
|
||||
listeners.stream().map(Tuple::v1).forEach(engine::register);
|
||||
|
||||
final AtomicBoolean scheduled = new AtomicBoolean();
|
||||
engine.add(new SchedulerEngine.Job(
|
||||
getTestName(),
|
||||
(startTime, now) -> {
|
||||
// only allow one triggering of the listeners
|
||||
if (scheduled.compareAndSet(false, true)) {
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}));
|
||||
|
||||
latch.await();
|
||||
|
||||
// now check that every listener was invoked
|
||||
assertTrue(listeners.stream().map(Tuple::v2).allMatch(AtomicBoolean::get));
|
||||
if (numberOfFailingListeners > 0) {
|
||||
assertFailedListenerLogMessage(mockLogger, numberOfFailingListeners);
|
||||
}
|
||||
verifyNoMoreInteractions(mockLogger);
|
||||
} finally {
|
||||
engine.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkipped() throws InterruptedException {
|
||||
final Logger mockLogger = mock(Logger.class);
|
||||
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger);
|
||||
try {
|
||||
final List<Tuple<SchedulerEngine.Listener, AtomicInteger>> listeners = new ArrayList<>();
|
||||
final int numberOfListeners = randomIntBetween(1, 32);
|
||||
final int numberOfSchedules = randomIntBetween(1, 32);
|
||||
final CountDownLatch listenersLatch = new CountDownLatch(numberOfSchedules * numberOfListeners);
|
||||
for (int i = 0; i < numberOfListeners; i++) {
|
||||
final AtomicInteger triggerCount = new AtomicInteger();
|
||||
final SchedulerEngine.Listener listener = event -> {
|
||||
if (triggerCount.incrementAndGet() <= numberOfSchedules) {
|
||||
listenersLatch.countDown();
|
||||
throw new RuntimeException(getTestName());
|
||||
} else {
|
||||
fail("listener invoked more than [" + numberOfSchedules + "] times");
|
||||
}
|
||||
};
|
||||
listeners.add(Tuple.tuple(listener, triggerCount));
|
||||
engine.register(listener);
|
||||
}
|
||||
|
||||
// latch for each invocation of nextScheduledTimeAfter, once for each scheduled run, and then a final time when we disable
|
||||
final CountDownLatch latch = new CountDownLatch(1 + numberOfSchedules);
|
||||
engine.add(new SchedulerEngine.Job(
|
||||
getTestName(),
|
||||
(startTime, now) -> {
|
||||
if (latch.getCount() >= 2) {
|
||||
latch.countDown();
|
||||
return 0;
|
||||
} else if (latch.getCount() == 1) {
|
||||
latch.countDown();
|
||||
return -1;
|
||||
} else {
|
||||
throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times");
|
||||
}
|
||||
}));
|
||||
|
||||
listenersLatch.await();
|
||||
assertTrue(listeners.stream().map(Tuple::v2).allMatch(count -> count.get() == numberOfSchedules));
|
||||
latch.await();
|
||||
assertFailedListenerLogMessage(mockLogger, numberOfListeners * numberOfSchedules);
|
||||
verifyNoMoreInteractions(mockLogger);
|
||||
} finally {
|
||||
engine.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
|
||||
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
|
||||
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||
verify(mockLogger, times(times)).warn(messageCaptor.capture(), throwableCaptor.capture());
|
||||
for (final ParameterizedMessage message : messageCaptor.getAllValues()) {
|
||||
assertThat(message.getFormat(), equalTo("listener failed while handling triggered event [{}]"));
|
||||
assertThat(message.getParameters(), arrayWithSize(1));
|
||||
assertThat(message.getParameters()[0], equalTo(getTestName()));
|
||||
}
|
||||
for (final Throwable throwable : throwableCaptor.getAllValues()) {
|
||||
assertThat(throwable, instanceOf(RuntimeException.class));
|
||||
assertThat(throwable.getMessage(), equalTo(getTestName()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
apply plugin: 'elasticsearch.standalone-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: xpackModule('core'), configuration: 'shadow')
|
||||
}
|
||||
|
||||
test {
|
||||
systemProperty 'tests.security.manager', 'false'
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.scheduler;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
public class EvilSchedulerEngineTests extends ESTestCase {
|
||||
|
||||
public void testOutOfMemoryErrorWhileTriggeredIsRethrownAndIsUncaught() throws InterruptedException {
|
||||
final AtomicReference<Throwable> maybeFatal = new AtomicReference<>();
|
||||
final CountDownLatch uncaughtLatuch = new CountDownLatch(1);
|
||||
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
|
||||
try {
|
||||
/*
|
||||
* We want to test that the out of memory error thrown from the scheduler engine goes uncaught on another thread; this gives us
|
||||
* confidence that an error thrown during a triggered event will lead to the node being torn down.
|
||||
*/
|
||||
final AtomicReference<Thread> maybeThread = new AtomicReference<>();
|
||||
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
|
||||
maybeFatal.set(e);
|
||||
maybeThread.set(Thread.currentThread());
|
||||
uncaughtLatuch.countDown();
|
||||
});
|
||||
final Logger mockLogger = mock(Logger.class);
|
||||
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger);
|
||||
try {
|
||||
final AtomicBoolean trigger = new AtomicBoolean();
|
||||
engine.register(event -> {
|
||||
if (trigger.compareAndSet(false, true)) {
|
||||
throw new OutOfMemoryError("640K ought to be enough for anybody");
|
||||
} else {
|
||||
fail("listener invoked twice");
|
||||
}
|
||||
});
|
||||
final CountDownLatch schedulerLatch = new CountDownLatch(1);
|
||||
engine.add(new SchedulerEngine.Job(
|
||||
getTestName(),
|
||||
(startTime, now) -> {
|
||||
if (schedulerLatch.getCount() == 1) {
|
||||
schedulerLatch.countDown();
|
||||
return 0;
|
||||
} else {
|
||||
throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times");
|
||||
}
|
||||
}));
|
||||
|
||||
uncaughtLatuch.await();
|
||||
assertTrue(trigger.get());
|
||||
assertNotNull(maybeFatal.get());
|
||||
assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class));
|
||||
assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody")));
|
||||
assertNotNull(maybeThread.get());
|
||||
assertThat(maybeThread.get(), not(equalTo(Thread.currentThread()))); // the error should be rethrown on another thread
|
||||
schedulerLatch.await();
|
||||
verifyNoMoreInteractions(mockLogger); // we never logged anything
|
||||
} finally {
|
||||
engine.stop();
|
||||
}
|
||||
} finally {
|
||||
// restore the uncaught exception handler
|
||||
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue