diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java index 10113fd0235..5fd9ac5780d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -335,8 +335,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon queuedSize.set(0L); failedAssignCount.set(0); - processingExecutor.shutdown(); - callBackExecutor.shutdown(); } private void entryRemoved(SegmentHolder segmentHolder, String path) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index eb3b25b6899..7e9659e2060 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -73,7 +73,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -119,6 +121,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase private final ObjectMapper jsonMapper; private final ZkPathsConfig zkPathsConfig; + private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d"); + private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d"); + public CuratorDruidCoordinatorTest() { jsonMapper = TestHelper.makeJsonMapper(); @@ -187,16 +192,16 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase curator, SOURCE_LOAD_PATH, objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"), + peonExec, + callbackExec, druidCoordinatorConfig ); destinationLoadQueuePeon = new CuratorLoadQueuePeon( curator, DESTINATION_LOAD_PATH, objectMapper, - Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"), - Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"), + peonExec, + callbackExec, druidCoordinatorConfig ); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); @@ -261,6 +266,15 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase @Rule public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS); + @Test + public void testStopDoesntKillPoolItDoesntOwn() throws Exception + { + setupView(); + sourceLoadQueuePeon.stop(); + Assert.assertFalse(peonExec.isShutdown()); + Assert.assertFalse(callbackExec.isShutdown()); + } + @Test public void testMoveSegment() throws Exception { diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index ecafcbfc7cc..118b911586b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -47,7 +47,9 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.http.JettyHttpClientModule; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ExecutorServices; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.metadata.MetadataRuleManager; @@ -248,7 +250,8 @@ public class CliCoordinator extends ServerRunnable ScheduledExecutorFactory factory, DruidCoordinatorConfig config, @EscalatedGlobal HttpClient httpClient, - ZkPathsConfig zkPaths + ZkPathsConfig zkPaths, + Lifecycle lifecycle ) { boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); @@ -256,9 +259,12 @@ public class CliCoordinator extends ServerRunnable if (useHttpLoadQueuePeon) { callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); } else { - callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon" - + "-callbackexec--%d"); + callBackExec = Execs.multiThreaded( + config.getNumCuratorCallBackThreads(), + "LoadQueuePeon-callbackexec--%d" + ); } + ExecutorServices.manageLifecycle(lifecycle, callBackExec); return new LoadQueueTaskMaster( curator, jsonMapper,