diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 59eb8b60dad..09347f519fb 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -250,13 +250,13 @@ public final class ExceptionsHelper { * @param throwable the throwable to test */ public static void dieOnError(Throwable throwable) { - final Optional maybeError = ExceptionsHelper.maybeError(throwable, logger); - if (maybeError.isPresent()) { + ExceptionsHelper.maybeError(throwable, logger).ifPresent(error -> { /* - * Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many - * invocations of user-code in try/catch blocks that swallow all throwables. This means that a rethrow here will not bubble up - * to where we want it to. So, we fork a thread and throw the exception from there where Netty can not get to it. We do not wrap - * the exception so as to not lose the original cause during exit. + * Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, sometimes the stack + * contains statements that catch any throwable (e.g., Netty, and the JDK futures framework). This means that a rethrow here + * will not bubble up to where we want it to. So, we fork a thread and throw the exception from there where we are sure the + * stack does not contain statements that catch any throwable. We do not wrap the exception so as to not lose the original cause + * during exit. */ try { // try to log the current stack trace @@ -264,12 +264,12 @@ public final class ExceptionsHelper { logger.error("fatal error\n{}", formatted); } finally { new Thread( - () -> { - throw maybeError.get(); - }) - .start(); + () -> { + throw error; + }) + .start(); } - } + }); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index ffc0257313b..71abb61bdca 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -3,8 +3,13 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.core.scheduler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -14,6 +19,7 @@ import java.time.Clock; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -89,13 +95,20 @@ public class SchedulerEngine { } private final Map schedules = ConcurrentCollections.newConcurrentMap(); - private final ScheduledExecutorService scheduler; private final Clock clock; + private final ScheduledExecutorService scheduler; + private final Logger logger; private final List listeners = new CopyOnWriteArrayList<>(); - public SchedulerEngine(Settings settings, Clock clock) { - this.clock = clock; - this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler")); + public SchedulerEngine(final Settings settings, final Clock clock) { + this(settings, clock, LogManager.getLogger(SchedulerEngine.class)); + } + + SchedulerEngine(final Settings settings, final Clock clock, final Logger logger) { + this.clock = Objects.requireNonNull(clock, "clock"); + this.scheduler = Executors.newScheduledThreadPool( + 1, EsExecutors.daemonThreadFactory(Objects.requireNonNull(settings, "settings"), "trigger_engine_scheduler")); + this.logger = Objects.requireNonNull(logger, "logger"); } public void register(Listener listener) { @@ -144,10 +157,15 @@ public class SchedulerEngine { return schedules.size(); } - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { + protected void notifyListeners(final String name, final long triggeredTime, final long scheduledTime) { final Event event = new Event(name, triggeredTime, scheduledTime); - for (Listener listener : listeners) { - listener.triggered(event); + for (final Listener listener : listeners) { + try { + listener.triggered(event); + } catch (final Exception e) { + // do not allow exceptions to escape this method; we should continue to notify listeners and schedule the next run + logger.warn(new ParameterizedMessage("listener failed while handling triggered event [{}]", name), e); + } } } @@ -169,8 +187,20 @@ public class SchedulerEngine { @Override public void run() { - long triggeredTime = clock.millis(); - notifyListeners(name, triggeredTime, scheduledTime); + final long triggeredTime = clock.millis(); + try { + notifyListeners(name, triggeredTime, scheduledTime); + } catch (final Throwable t) { + /* + * Allowing the throwable to escape here will lead to be it being caught in FutureTask#run and set as the outcome of this + * task; however, we never inspect the the outcomes of these scheduled tasks and so allowing the throwable to escape + * unhandled here could lead to use losing fatal errors. Instead, we rely on ExceptionsHelper#dieOnError to appropriately + * dispatch any error to the uncaught exception handler. We should never see an exception here as these do not escape from + * SchedulerEngine#notifyListeners. + */ + ExceptionsHelper.dieOnError(t); + throw t; + } scheduleNextRun(triggeredTime); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java new file mode 100644 index 00000000000..869a320fb63 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.scheduler; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.mockito.ArgumentCaptor; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class SchedulerEngineTests extends ESTestCase { + + public void testListenersThrowingExceptionsDoNotCauseOtherListenersToBeSkipped() throws InterruptedException { + final Logger mockLogger = mock(Logger.class); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger); + try { + final List> listeners = new ArrayList<>(); + final int numberOfListeners = randomIntBetween(1, 32); + int numberOfFailingListeners = 0; + final CountDownLatch latch = new CountDownLatch(numberOfListeners); + for (int i = 0; i < numberOfListeners; i++) { + final AtomicBoolean trigger = new AtomicBoolean(); + final SchedulerEngine.Listener listener; + if (randomBoolean()) { + listener = event -> { + if (trigger.compareAndSet(false, true)) { + latch.countDown(); + } else { + fail("listener invoked twice"); + } + }; + } else { + numberOfFailingListeners++; + listener = event -> { + if (trigger.compareAndSet(false, true)) { + latch.countDown(); + throw new RuntimeException(getTestName()); + } else { + fail("listener invoked twice"); + } + }; + } + listeners.add(Tuple.tuple(listener, trigger)); + } + + // randomize the order and register the listeners + Collections.shuffle(listeners, random()); + listeners.stream().map(Tuple::v1).forEach(engine::register); + + final AtomicBoolean scheduled = new AtomicBoolean(); + engine.add(new SchedulerEngine.Job( + getTestName(), + (startTime, now) -> { + // only allow one triggering of the listeners + if (scheduled.compareAndSet(false, true)) { + return 0; + } else { + return -1; + } + })); + + latch.await(); + + // now check that every listener was invoked + assertTrue(listeners.stream().map(Tuple::v2).allMatch(AtomicBoolean::get)); + if (numberOfFailingListeners > 0) { + assertFailedListenerLogMessage(mockLogger, numberOfFailingListeners); + } + verifyNoMoreInteractions(mockLogger); + } finally { + engine.stop(); + } + } + + public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkipped() throws InterruptedException { + final Logger mockLogger = mock(Logger.class); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger); + try { + final List> listeners = new ArrayList<>(); + final int numberOfListeners = randomIntBetween(1, 32); + final int numberOfSchedules = randomIntBetween(1, 32); + final CountDownLatch listenersLatch = new CountDownLatch(numberOfSchedules * numberOfListeners); + for (int i = 0; i < numberOfListeners; i++) { + final AtomicInteger triggerCount = new AtomicInteger(); + final SchedulerEngine.Listener listener = event -> { + if (triggerCount.incrementAndGet() <= numberOfSchedules) { + listenersLatch.countDown(); + throw new RuntimeException(getTestName()); + } else { + fail("listener invoked more than [" + numberOfSchedules + "] times"); + } + }; + listeners.add(Tuple.tuple(listener, triggerCount)); + engine.register(listener); + } + + // latch for each invocation of nextScheduledTimeAfter, once for each scheduled run, and then a final time when we disable + final CountDownLatch latch = new CountDownLatch(1 + numberOfSchedules); + engine.add(new SchedulerEngine.Job( + getTestName(), + (startTime, now) -> { + if (latch.getCount() >= 2) { + latch.countDown(); + return 0; + } else if (latch.getCount() == 1) { + latch.countDown(); + return -1; + } else { + throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times"); + } + })); + + listenersLatch.await(); + assertTrue(listeners.stream().map(Tuple::v2).allMatch(count -> count.get() == numberOfSchedules)); + latch.await(); + assertFailedListenerLogMessage(mockLogger, numberOfListeners * numberOfSchedules); + verifyNoMoreInteractions(mockLogger); + } finally { + engine.stop(); + } + } + + private void assertFailedListenerLogMessage(Logger mockLogger, int times) { + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class); + final ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockLogger, times(times)).warn(messageCaptor.capture(), throwableCaptor.capture()); + for (final ParameterizedMessage message : messageCaptor.getAllValues()) { + assertThat(message.getFormat(), equalTo("listener failed while handling triggered event [{}]")); + assertThat(message.getParameters(), arrayWithSize(1)); + assertThat(message.getParameters()[0], equalTo(getTestName())); + } + for (final Throwable throwable : throwableCaptor.getAllValues()) { + assertThat(throwable, instanceOf(RuntimeException.class)); + assertThat(throwable.getMessage(), equalTo(getTestName())); + } + } + +} diff --git a/x-pack/qa/evil-tests/build.gradle b/x-pack/qa/evil-tests/build.gradle new file mode 100644 index 00000000000..9b6055ffad7 --- /dev/null +++ b/x-pack/qa/evil-tests/build.gradle @@ -0,0 +1,9 @@ +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'shadow') +} + +test { + systemProperty 'tests.security.manager', 'false' +} diff --git a/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java b/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java new file mode 100644 index 00000000000..2dfd314ffb0 --- /dev/null +++ b/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.scheduler; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.time.Clock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class EvilSchedulerEngineTests extends ESTestCase { + + public void testOutOfMemoryErrorWhileTriggeredIsRethrownAndIsUncaught() throws InterruptedException { + final AtomicReference maybeFatal = new AtomicReference<>(); + final CountDownLatch uncaughtLatuch = new CountDownLatch(1); + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); + try { + /* + * We want to test that the out of memory error thrown from the scheduler engine goes uncaught on another thread; this gives us + * confidence that an error thrown during a triggered event will lead to the node being torn down. + */ + final AtomicReference maybeThread = new AtomicReference<>(); + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + maybeFatal.set(e); + maybeThread.set(Thread.currentThread()); + uncaughtLatuch.countDown(); + }); + final Logger mockLogger = mock(Logger.class); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger); + try { + final AtomicBoolean trigger = new AtomicBoolean(); + engine.register(event -> { + if (trigger.compareAndSet(false, true)) { + throw new OutOfMemoryError("640K ought to be enough for anybody"); + } else { + fail("listener invoked twice"); + } + }); + final CountDownLatch schedulerLatch = new CountDownLatch(1); + engine.add(new SchedulerEngine.Job( + getTestName(), + (startTime, now) -> { + if (schedulerLatch.getCount() == 1) { + schedulerLatch.countDown(); + return 0; + } else { + throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times"); + } + })); + + uncaughtLatuch.await(); + assertTrue(trigger.get()); + assertNotNull(maybeFatal.get()); + assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class)); + assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody"))); + assertNotNull(maybeThread.get()); + assertThat(maybeThread.get(), not(equalTo(Thread.currentThread()))); // the error should be rethrown on another thread + schedulerLatch.await(); + verifyNoMoreInteractions(mockLogger); // we never logged anything + } finally { + engine.stop(); + } + } finally { + // restore the uncaught exception handler + Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); + } + } + +}