Merge pull request #12599 from jpountz/fix/PrioritizedEsThreadPoolExecutor_concurrency

Fix concurrency issue in PrioritizedEsThreadPoolExecutor.
This commit is contained in:
Adrien Grand 2015-08-04 10:00:46 +02:00
commit caca13c878
1 changed files with 24 additions and 16 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
@ -161,8 +162,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private Runnable runnable;
private final long insertionOrder;
private volatile ScheduledFuture<?> timeoutFuture;
private volatile boolean started = false;
// these two variables are protected by 'this'
private ScheduledFuture<?> timeoutFuture;
private boolean started = false;
TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), insertionOrder);
@ -176,10 +179,12 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
@Override
public void run() {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
synchronized (this) {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
}
runAndClean(runnable);
}
@ -193,17 +198,20 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) {
timeoutFuture = timer.schedule(new Runnable() {
@Override
public void run() {
if (remove(TieBreakingPrioritizedRunnable.this)) {
runAndClean(timeoutCallback);
}
synchronized (this) {
if (timeoutFuture != null) {
throw new IllegalStateException("scheduleTimeout may only be called once");
}
if (started == false) {
timeoutFuture = timer.schedule(new Runnable() {
@Override
public void run() {
if (remove(TieBreakingPrioritizedRunnable.this)) {
runAndClean(timeoutCallback);
}
}
}, timeValue.nanos(), TimeUnit.NANOSECONDS);
}
}, timeValue.nanos(), TimeUnit.NANOSECONDS);
if (started) {
// if the actual action already it might have missed the setting of the future. Clean it ourselves.
FutureUtils.cancel(timeoutFuture);
}
}