Ensure AsyncTask#isScheduled remain false after close (#45687)
If a scheduled task of an AbstractAsyncTask starts after it was closed, then isScheduledOrRunning can remain true forever although no task is running or scheduled. Closes #45576
This commit is contained in:
parent
6f2daa85e3
commit
6f5d944fbd
|
@ -134,6 +134,9 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
|
||||||
@Override
|
@Override
|
||||||
public final void run() {
|
public final void run() {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
if (isClosed()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
cancellable = null;
|
cancellable = null;
|
||||||
isScheduledOrRunning = autoReschedule;
|
isScheduledOrRunning = autoReschedule;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,18 +18,23 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.common.util.concurrent;
|
package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.Randomness;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
public class AbstractAsyncTaskTests extends ESTestCase {
|
public class AbstractAsyncTaskTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -203,4 +208,31 @@ public class AbstractAsyncTaskTests extends ESTestCase {
|
||||||
assertFalse(task.isScheduled());
|
assertFalse(task.isScheduled());
|
||||||
assertTrue(task.isClosed());
|
assertTrue(task.isClosed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIsScheduledRemainFalseAfterClose() throws Exception {
|
||||||
|
int numTasks = between(10, 50);
|
||||||
|
List<AbstractAsyncTask> tasks = new ArrayList<>(numTasks);
|
||||||
|
AtomicLong counter = new AtomicLong();
|
||||||
|
for (int i = 0; i < numTasks; i++) {
|
||||||
|
AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(randomIntBetween(1, 2)), true) {
|
||||||
|
@Override
|
||||||
|
protected boolean mustReschedule() {
|
||||||
|
return counter.get() <= 1000;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
protected void runInternal() {
|
||||||
|
counter.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
task.rescheduleIfNecessary();
|
||||||
|
tasks.add(task);
|
||||||
|
}
|
||||||
|
Randomness.shuffle(tasks);
|
||||||
|
IOUtils.close(tasks);
|
||||||
|
Randomness.shuffle(tasks);
|
||||||
|
for (AbstractAsyncTask task : tasks) {
|
||||||
|
assertTrue(task.isClosed());
|
||||||
|
assertFalse(task.isScheduled());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue