From 1eaa9f9fda3ce7466fe56f4afb4d374026c9febd Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Mon, 3 Aug 2015 14:11:49 +0200 Subject: [PATCH] Fix concurrency issue in PrioritizedEsThreadPoolExecutor. Tasks can be registered with a timeout, which runs as a task in a separate threadpool. The idea is that the timeout runner cancels the main task when the time is out, and the timeout runner is cancelled when the main task starts executing. However, the following statement: ```java timeoutFuture = timer.schedule(new Runnable() { @Override public void run() { if (remove(TieBreakingPrioritizedRunnable.this)) { runAndClean(timeoutCallback); } } }, timeValue.nanos(), TimeUnit.NANOSECONDS); ``` is not atomic: the removal task is first started, and then the (volatile) variable is assigned. As a consequence, there is a short window that allows a timeout task to wait until the time is out even if the task is already completed. See http://build-us-00.elastic.co/job/es_core_17_centos/496/ for an example of such a failure. --- .../PrioritizedEsThreadPoolExecutor.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index fe1b2a2438d..65998b57cc7 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; import com.google.common.collect.Lists; + import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; @@ -161,8 +162,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private Runnable runnable; private final long insertionOrder; - private volatile ScheduledFuture timeoutFuture; - private volatile boolean started = false; + + // these two variables are protected by 'this' + private ScheduledFuture timeoutFuture; + private boolean started = false; TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) { this(runnable, runnable.priority(), insertionOrder); @@ -176,10 +179,12 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { @Override public void run() { - // make the task as stared. This is needed for synchronization with the timeout handling - // see #scheduleTimeout() - started = true; - FutureUtils.cancel(timeoutFuture); + synchronized (this) { + // make the task as stared. This is needed for synchronization with the timeout handling + // see #scheduleTimeout() + started = true; + FutureUtils.cancel(timeoutFuture); + } runAndClean(runnable); } @@ -193,17 +198,20 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) { - timeoutFuture = timer.schedule(new Runnable() { - @Override - public void run() { - if (remove(TieBreakingPrioritizedRunnable.this)) { - runAndClean(timeoutCallback); - } + synchronized (this) { + if (timeoutFuture != null) { + throw new IllegalStateException("scheduleTimeout may only be called once"); + } + if (started == false) { + timeoutFuture = timer.schedule(new Runnable() { + @Override + public void run() { + if (remove(TieBreakingPrioritizedRunnable.this)) { + runAndClean(timeoutCallback); + } + } + }, timeValue.nanos(), TimeUnit.NANOSECONDS); } - }, timeValue.nanos(), TimeUnit.NANOSECONDS); - if (started) { - // if the actual action already it might have missed the setting of the future. Clean it ourselves. - FutureUtils.cancel(timeoutFuture); } }