diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java index c0ccb967dbd..6b49ab0a0ee 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java @@ -159,6 +159,9 @@ public class Execs @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + throw new RejectedExecutionException("Executor is shutdown, rejecting task"); + } try { executor.getQueue().put(r); } diff --git a/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java b/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java index de76af7cb7b..6c7925e185d 100644 --- a/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java +++ b/processing/src/test/java/org/apache/druid/concurrent/ExecsTest.java @@ -19,15 +19,20 @@ package org.apache.druid.concurrent; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ExecsTest @@ -118,6 +123,36 @@ public class ExecsTest producer.shutdown(); } + @Test + public void testTaskAddedToShutdownExecutorThrowsException() throws Exception + { + // The implementation of Execs.newBlockingSingleThreaded() rejectedExecutionHandler should not add tasks when it's in shutDown state + // When a SynchronousQueue is used in executor and a task is put in it in ShutDown state, it will forever stuck in WAITING state + // as executor will not take() the task to schedule it. + final ListeningExecutorService intermediateTempExecutor = MoreExecutors.listeningDecorator( + Execs.newBlockingSingleThreaded("[TASK_ID]-appenderator-abandon", 0) + ); + Callable task = () -> { + try { + Thread.sleep(500); // Simulate long-running task + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupted status + } + return null; + }; + + // Submit multiple tasks together + Assert.assertNotNull(intermediateTempExecutor.submit(task)); + Assert.assertNotNull(intermediateTempExecutor.submit(task)); + + intermediateTempExecutor.shutdownNow(); + // Submit task after shutDown / shutDownNow should not be added in queue + Assert.assertThrows(RejectedExecutionException.class, () -> intermediateTempExecutor.submit(task)); + Assert.assertTrue(intermediateTempExecutor.awaitTermination(10, TimeUnit.SECONDS)); + Assert.assertTrue(intermediateTempExecutor.isShutdown()); + } + @Test public void testDirectExecutorFactory() {