diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 727bb2b650d..317b61d951f 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -78,19 +78,16 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper... command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet()); } - super.execute(command); if (timeout.nanos() >= 0) { - final Runnable fCommand = command; - timer.schedule(new Runnable() { - @Override - public void run() { - boolean removed = getQueue().remove(fCommand); - if (removed) { - timeoutCallback.run(); - } - } - }, timeout.nanos(), TimeUnit.NANOSECONDS); + if (command instanceof TieBreakingPrioritizedRunnable) { + ((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout); + } else { + // We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask + // and passed it to execute, which doesn't make much sense + throw new UnsupportedOperationException("Execute with timeout is not supported for future tasks"); + } } + super.execute(command); } @Override @@ -133,10 +130,11 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } } - static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable { + private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable { - final Runnable runnable; - final long insertionOrder; + private Runnable runnable; + private final long insertionOrder; + private ScheduledFuture timeoutFuture; TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) { this(runnable, runnable.priority(), insertionOrder); @@ -150,7 +148,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { @Override public void run() { - runnable.run(); + FutureUtils.cancel(timeoutFuture); + runAndClean(runnable); } @Override @@ -161,9 +160,35 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { } return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1; } + + public void scheduleTimeout(ScheduledExecutorService timer, final Runnable timeoutCallback, TimeValue timeValue) { + timeoutFuture = timer.schedule(new Runnable() { + @Override + public void run() { + if (remove(TieBreakingPrioritizedRunnable.this)) { + runAndClean(timeoutCallback); + } + } + }, timeValue.nanos(), TimeUnit.NANOSECONDS); + } + + /** + * Timeout callback might remain in the timer scheduling queue for some time and it might hold + * the pointers to other objects. As a result it's possible to run out of memory if a large number of + * tasks are executed + */ + private void runAndClean(Runnable run) { + try { + run.run(); + } finally { + runnable = null; + timeoutFuture = null; + } + } + } - static class PrioritizedFutureTask extends FutureTask implements Comparable { + private final class PrioritizedFutureTask extends FutureTask implements Comparable { final Object task; final Priority priority; diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 2bad19b1713..c18a0d51618 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -150,6 +150,7 @@ public class ThreadPool extends AbstractComponent { this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.scheduler.setRemoveOnCancelPolicy(true); if (nodeSettingsService != null) { nodeSettingsService.addListener(new ApplySettings()); } diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index e079eb62aae..38477eda670 100644 --- a/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.Test; import java.util.ArrayList; @@ -240,6 +241,33 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase { assertTrue(terminate(timer, executor)); } + @Test + public void testTimeoutCleanup() throws Exception { + ThreadPool threadPool = new ThreadPool("test"); + ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); + final AtomicBoolean timeoutCalled = new AtomicBoolean(); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + final CountDownLatch invoked = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + invoked.countDown(); + } + }, timer, TimeValue.timeValueMillis(1000), new Runnable() { + @Override + public void run() { + // We should never get here + timeoutCalled.set(true); + } + } + ); + invoked.await(); + assertThat(timer.getQueue().size(), equalTo(0)); + assertThat(timeoutCalled.get(), equalTo(false)); + assertTrue(terminate(executor)); + assertTrue(terminate(threadPool)); + } + static class AwaitingJob extends PrioritizedRunnable { private final CountDownLatch latch;