Internal: promptly cleanup updateTask timeout handler

Improve cleanup of updateTask timeout handlers. The timeout handlers should be removed as soon as a corresponding update task is processed. Otherwise, timeout handlers might keep old updateTasks and all objects that they are pointing to in memory for the duration of timeout (15 minutes by default).

Fixes #9621
This commit is contained in:
Igor Motov 2015-02-09 20:42:50 -05:00
parent 5b96595854
commit 6544890e14
3 changed files with 70 additions and 16 deletions

View File

@ -78,19 +78,16 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
}
super.execute(command);
if (timeout.nanos() >= 0) {
final Runnable fCommand = command;
timer.schedule(new Runnable() {
@Override
public void run() {
boolean removed = getQueue().remove(fCommand);
if (removed) {
timeoutCallback.run();
}
}
}, timeout.nanos(), TimeUnit.NANOSECONDS);
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
} else {
// We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask
// and passed it to execute, which doesn't make much sense
throw new UnsupportedOperationException("Execute with timeout is not supported for future tasks");
}
}
super.execute(command);
}
@Override
@ -133,10 +130,11 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
}
static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {
private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {
final Runnable runnable;
final long insertionOrder;
private Runnable runnable;
private final long insertionOrder;
private ScheduledFuture<?> timeoutFuture;
TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), insertionOrder);
@ -150,7 +148,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
@Override
public void run() {
runnable.run();
FutureUtils.cancel(timeoutFuture);
runAndClean(runnable);
}
@Override
@ -161,9 +160,35 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1;
}
public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) {
timeoutFuture = timer.schedule(new Runnable() {
@Override
public void run() {
if (remove(TieBreakingPrioritizedRunnable.this)) {
runAndClean(timeoutCallback);
}
}
}, timeValue.nanos(), TimeUnit.NANOSECONDS);
}
/**
* Timeout callback might remain in the timer scheduling queue for some time and it might hold
* the pointers to other objects. As a result it's possible to run out of memory if a large number of
* tasks are executed
*/
private void runAndClean(Runnable run) {
try {
run.run();
} finally {
runnable = null;
timeoutFuture = null;
}
}
}
static class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
private final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
final Object task;
final Priority priority;

View File

@ -150,6 +150,7 @@ public class ThreadPool extends AbstractComponent {
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduler.setRemoveOnCancelPolicy(true);
if (nodeSettingsService != null) {
nodeSettingsService.addListener(new ApplySettings());
}

View File

@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;
import java.util.ArrayList;
@ -240,6 +241,33 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertTrue(terminate(timer, executor));
}
@Test
public void testTimeoutCleanup() throws Exception {
ThreadPool threadPool = new ThreadPool("test");
ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler();
final AtomicBoolean timeoutCalled = new AtomicBoolean();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName()));
final CountDownLatch invoked = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
invoked.countDown();
}
}, timer, TimeValue.timeValueMillis(1000), new Runnable() {
@Override
public void run() {
// We should never get here
timeoutCalled.set(true);
}
}
);
invoked.await();
assertThat(timer.getQueue().size(), equalTo(0));
assertThat(timeoutCalled.get(), equalTo(false));
assertTrue(terminate(executor));
assertTrue(terminate(threadPool));
}
static class AwaitingJob extends PrioritizedRunnable {
private final CountDownLatch latch;