From 6f5d944fbd3484d3b8f55824617ed299f938d95d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 17 Aug 2019 13:47:09 -0400 Subject: [PATCH] Ensure AsyncTask#isScheduled remain false after close (#45687) If a scheduled task of an AbstractAsyncTask starts after it was closed, then isScheduledOrRunning can remain true forever although no task is running or scheduled. Closes #45576 --- .../util/concurrent/AbstractAsyncTask.java | 3 ++ .../concurrent/AbstractAsyncTaskTests.java | 32 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index 3c1716cda15..1ef9a484a27 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -134,6 +134,9 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { @Override public final void run() { synchronized (this) { + if (isClosed()) { + return; + } cancellable = null; isScheduledOrRunning = autoReschedule; } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java index 3a1cab90f0d..10eec2c13d3 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -18,18 +18,23 @@ */ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; import org.junit.BeforeClass; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public class AbstractAsyncTaskTests extends ESTestCase { @@ -203,4 +208,31 @@ public class AbstractAsyncTaskTests extends ESTestCase { assertFalse(task.isScheduled()); assertTrue(task.isClosed()); } + + public void testIsScheduledRemainFalseAfterClose() throws Exception { + int numTasks = between(10, 50); + List tasks = new ArrayList<>(numTasks); + AtomicLong counter = new AtomicLong(); + for (int i = 0; i < numTasks; i++) { + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(randomIntBetween(1, 2)), true) { + @Override + protected boolean mustReschedule() { + return counter.get() <= 1000; + } + @Override + protected void runInternal() { + counter.incrementAndGet(); + } + }; + task.rescheduleIfNecessary(); + tasks.add(task); + } + Randomness.shuffle(tasks); + IOUtils.close(tasks); + Randomness.shuffle(tasks); + for (AbstractAsyncTask task : tasks) { + assertTrue(task.isClosed()); + assertFalse(task.isScheduled()); + } + } }