From af164cbc100185f33388759df9e866b468925c58 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 30 Nov 2022 21:02:48 +0530 Subject: [PATCH] Fix an issue with WorkerSketchFetcher not terminating on shutdown (#13459) * Fix an issue with WorkerSketchFetcher not terminating on shutdown * Change threadpool name --- .../org/apache/druid/msq/exec/ControllerImpl.java | 5 +++-- .../apache/druid/msq/exec/WorkerSketchFetcher.java | 12 +++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index cafc0f38925..318c33a759c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -524,6 +524,8 @@ public class ControllerImpl implements Controller context.registerController(this, closer); this.netClient = new ExceptionWrappingWorkerClient(context.taskClientFor(this)); + closer.register(netClient::close); + ClusterStatisticsMergeMode clusterStatisticsMergeMode = MultiStageQueryContext.getClusterStatisticsMergeMode(task.getQuerySpec().getQuery().context()); @@ -532,8 +534,7 @@ public class ControllerImpl implements Controller int statisticsMaxRetainedBytes = WorkerMemoryParameters.createProductionInstanceForController(context.injector()) .getPartitionStatisticsMaxRetainedBytes(); this.workerSketchFetcher = new WorkerSketchFetcher(netClient, clusterStatisticsMergeMode, statisticsMaxRetainedBytes); - - closer.register(netClient::close); + closer.register(workerSketchFetcher::close); final boolean isDurableStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index 3482b50daaf..c4118a9d38e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -25,6 +25,7 @@ import org.apache.druid.frame.key.ClusterByPartition; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; @@ -39,13 +40,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.IntStream; /** * Queues up fetching sketches from workers and progressively generates partitions boundaries. */ -public class WorkerSketchFetcher +public class WorkerSketchFetcher implements AutoCloseable { private static final Logger log = new Logger(WorkerSketchFetcher.class); private static final int DEFAULT_THREAD_COUNT = 4; @@ -63,7 +63,7 @@ public class WorkerSketchFetcher { this.workerClient = workerClient; this.clusterStatisticsMergeMode = clusterStatisticsMergeMode; - this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT); + this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT, "SketchFetcherThreadPool-%d"); this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes; } @@ -337,4 +337,10 @@ public class WorkerSketchFetcher return either.valueOrThrow().size(); } } + + @Override + public void close() + { + executorService.shutdownNow(); + } }