Fix concurrency issue in PrioritizedEsThreadPoolExecutor.

Tasks can be registered with a timeout, which runs as a task in a separate
threadpool. The idea is that the timeout runner cancels the main task when
the time is out, and the timeout runner is cancelled when the main task
starts executing. However, the following statement:

```java
                    timeoutFuture = timer.schedule(new Runnable() {
                        @Override
                        public void run() {
                            if (remove(TieBreakingPrioritizedRunnable.this)) {
                                runAndClean(timeoutCallback);
                            }
                        }
                    }, timeValue.nanos(), TimeUnit.NANOSECONDS);
```

is not atomic: the removal task is first started, and then the (volatile)
variable is assigned. As a consequence, there is a short window that allows
a timeout task to wait until the time is out even if the task is already
completed.

See http://build-us-00.elastic.co/job/es_core_17_centos/496/ for an example of
such a failure.
This commit is contained in:
Adrien Grand 2015-08-03 14:11:49 +02:00
parent 365a324fc5
commit 1eaa9f9fda
1 changed files with 24 additions and 16 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
@ -161,8 +162,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private Runnable runnable;
private final long insertionOrder;
private volatile ScheduledFuture<?> timeoutFuture;
private volatile boolean started = false;
// these two variables are protected by 'this'
private ScheduledFuture<?> timeoutFuture;
private boolean started = false;
TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), insertionOrder);
@ -176,10 +179,12 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
@Override
public void run() {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
synchronized (this) {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
}
runAndClean(runnable);
}
@ -193,17 +198,20 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) {
timeoutFuture = timer.schedule(new Runnable() {
@Override
public void run() {
if (remove(TieBreakingPrioritizedRunnable.this)) {
runAndClean(timeoutCallback);
}
synchronized (this) {
if (timeoutFuture != null) {
throw new IllegalStateException("scheduleTimeout may only be called once");
}
if (started == false) {
timeoutFuture = timer.schedule(new Runnable() {
@Override
public void run() {
if (remove(TieBreakingPrioritizedRunnable.this)) {
runAndClean(timeoutCallback);
}
}
}, timeValue.nanos(), TimeUnit.NANOSECONDS);
}
}, timeValue.nanos(), TimeUnit.NANOSECONDS);
if (started) {
// if the actual action already it might have missed the setting of the future. Clean it ourselves.
FutureUtils.cancel(timeoutFuture);
}
}