scheduleAtFixedRate would hang (#42993)

Though not in use in elasticsearch currently, it seems surprising that
ThreadPool.scheduler().scheduleAtFixedRate would hang. A recurring
scheduled task is never completed (except on failure) and we test for
exceptions using RunnableFuture.get(), which hangs for periodic tasks.
Fixed by checking that task is done before calling .get().
This commit is contained in:
Henning Andersen 2019-06-11 19:46:03 +02:00 committed by Henning Andersen
parent 172cd4dbfa
commit 30d8085d96
3 changed files with 19 additions and 1 deletions

View File

@ -124,6 +124,7 @@ public class EsExecutors {
*/ */
public static Throwable rethrowErrors(Runnable runnable) { public static Throwable rethrowErrors(Runnable runnable) {
if (runnable instanceof RunnableFuture) { if (runnable instanceof RunnableFuture) {
assert ((RunnableFuture) runnable).isDone();
try { try {
((RunnableFuture) runnable).get(); ((RunnableFuture) runnable).get();
} catch (final Exception e) { } catch (final Exception e) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -276,7 +277,11 @@ public interface Scheduler {
if (t != null) return; if (t != null) return;
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we // Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
// accept the wrapped exception in the output. // accept the wrapped exception in the output.
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r)); if (r instanceof RunnableFuture && ((RunnableFuture<?>) r).isDone()) {
// only check this if task is done, which it always is except for periodic tasks. Periodic tasks will hang on
// RunnableFuture.get()
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
}
} }
} }
} }

View File

@ -153,4 +153,16 @@ public class SchedulerTests extends ESTestCase {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS); Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
} }
} }
public void testScheduleAtFixedRate() throws InterruptedException {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY);
try {
CountDownLatch missingExecutions = new CountDownLatch(randomIntBetween(1, 10));
executor.scheduleAtFixedRate(missingExecutions::countDown,
randomIntBetween(1, 10), randomIntBetween(1, 10), TimeUnit.MILLISECONDS);
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}
} }