Fix an issue with WorkerSketchFetcher not terminating on shutdown (#13459)

* Fix an issue with WorkerSketchFetcher not terminating on shutdown

* Change threadpool name
This commit is contained in:
Adarsh Sanjeev 2022-11-30 21:02:48 +05:30 committed by GitHub
parent 8ff1b2d5d4
commit af164cbc10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 5 deletions

View File

@ -524,6 +524,8 @@ public class ControllerImpl implements Controller
context.registerController(this, closer);
this.netClient = new ExceptionWrappingWorkerClient(context.taskClientFor(this));
closer.register(netClient::close);
ClusterStatisticsMergeMode clusterStatisticsMergeMode =
MultiStageQueryContext.getClusterStatisticsMergeMode(task.getQuerySpec().getQuery().context());
@ -532,8 +534,7 @@ public class ControllerImpl implements Controller
int statisticsMaxRetainedBytes = WorkerMemoryParameters.createProductionInstanceForController(context.injector())
.getPartitionStatisticsMaxRetainedBytes();
this.workerSketchFetcher = new WorkerSketchFetcher(netClient, clusterStatisticsMergeMode, statisticsMaxRetainedBytes);
closer.register(netClient::close);
closer.register(workerSketchFetcher::close);
final boolean isDurableStorageEnabled =
MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context());

View File

@ -25,6 +25,7 @@ import org.apache.druid.frame.key.ClusterByPartition;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
@ -39,13 +40,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
* Queues up fetching sketches from workers and progressively generates partitions boundaries.
*/
public class WorkerSketchFetcher
public class WorkerSketchFetcher implements AutoCloseable
{
private static final Logger log = new Logger(WorkerSketchFetcher.class);
private static final int DEFAULT_THREAD_COUNT = 4;
@ -63,7 +63,7 @@ public class WorkerSketchFetcher
{
this.workerClient = workerClient;
this.clusterStatisticsMergeMode = clusterStatisticsMergeMode;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT);
this.executorService = Execs.multiThreaded(DEFAULT_THREAD_COUNT, "SketchFetcherThreadPool-%d");
this.statisticsMaxRetainedBytes = statisticsMaxRetainedBytes;
}
@ -337,4 +337,10 @@ public class WorkerSketchFetcher
return either.valueOrThrow().size();
}
}
@Override
public void close()
{
executorService.shutdownNow();
}
}