diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index fe1b2a2438d..65998b57cc7 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; import com.google.common.collect.Lists; + import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; @@ -161,8 +162,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private Runnable runnable; private final long insertionOrder; - private volatile ScheduledFuture timeoutFuture; - private volatile boolean started = false; + + // these two variables are protected by 'this' + private ScheduledFuture timeoutFuture; + private boolean started = false; TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) { this(runnable, runnable.priority(), insertionOrder); @@ -176,10 +179,12 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { @Override public void run() { - // make the task as stared. This is needed for synchronization with the timeout handling - // see #scheduleTimeout() - started = true; - FutureUtils.cancel(timeoutFuture); + synchronized (this) { + // make the task as stared. This is needed for synchronization with the timeout handling + // see #scheduleTimeout() + started = true; + FutureUtils.cancel(timeoutFuture); + } runAndClean(runnable); } @@ -193,17 +198,20 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) { - timeoutFuture = timer.schedule(new Runnable() { - @Override - public void run() { - if (remove(TieBreakingPrioritizedRunnable.this)) { - runAndClean(timeoutCallback); - } + synchronized (this) { + if (timeoutFuture != null) { + throw new IllegalStateException("scheduleTimeout may only be called once"); + } + if (started == false) { + timeoutFuture = timer.schedule(new Runnable() { + @Override + public void run() { + if (remove(TieBreakingPrioritizedRunnable.this)) { + runAndClean(timeoutCallback); + } + } + }, timeValue.nanos(), TimeUnit.NANOSECONDS); } - }, timeValue.nanos(), TimeUnit.NANOSECONDS); - if (started) { - // if the actual action already it might have missed the setting of the future. Clean it ourselves. - FutureUtils.cancel(timeoutFuture); } }