From 43a1c96cd15a3fd9a44da721aa9624810bb24991 Mon Sep 17 00:00:00 2001 From: Bartosz Mikulski Date: Wed, 7 Feb 2024 06:24:38 +0100 Subject: [PATCH] Fix query-cancellation-executor memory leak (#15754) This PR fixes #15069 by resolving a memory leak caused by a thread leak in query-cancellation-executor. --- .../druid/client/DirectDruidClient.java | 11 +++++-- .../client/DirectDruidClientFactory.java | 13 +++++++- .../druid/client/DirectDruidClientTest.java | 17 +++++++++-- .../apache/druid/client/SimpleServerView.java | 30 ++++++------------- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index f8257d44367..9a93c8bacd0 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -78,6 +78,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -127,7 +128,8 @@ public class DirectDruidClient implements QueryRunner HttpClient httpClient, String scheme, String host, - ServiceEmitter emitter + ServiceEmitter emitter, + ScheduledExecutorService queryCancellationExecutor ) { this.warehouse = warehouse; @@ -140,7 +142,7 @@ public class DirectDruidClient implements QueryRunner this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; this.openConnections = new AtomicInteger(); - this.queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); + this.queryCancellationExecutor = queryCancellationExecutor; } public int getNumOpenConnections() @@ -551,7 +553,7 @@ public class DirectDruidClient implements QueryRunner if (!responseFuture.isDone()) { log.error("Error cancelling query[%s]", query); } - StatusResponseHolder response = responseFuture.get(); + StatusResponseHolder response = responseFuture.get(30, TimeUnit.SECONDS); if (response.getStatus().getCode() >= 500) { log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].", query, @@ -562,6 +564,9 @@ public class DirectDruidClient implements QueryRunner catch (ExecutionException | InterruptedException e) { log.error(e, "Error cancelling query[%s]", query); } + catch (TimeoutException e) { + log.error(e, "Timed out cancelling query[%s]", query); + } }; queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS); } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClientFactory.java b/server/src/main/java/org/apache/druid/client/DirectDruidClientFactory.java index 23529f2ccac..4f29130537b 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClientFactory.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClientFactory.java @@ -21,16 +21,22 @@ package org.apache.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.utils.JvmUtils; + +import java.util.concurrent.ScheduledExecutorService; /** * Factory for building {@link DirectDruidClient} */ +@LazySingleton public class DirectDruidClientFactory { private final ServiceEmitter emitter; @@ -38,6 +44,7 @@ public class DirectDruidClientFactory private final QueryWatcher queryWatcher; private final ObjectMapper smileMapper; private final HttpClient httpClient; + private final ScheduledExecutorService queryCancellationExecutor; @Inject public DirectDruidClientFactory( @@ -53,6 +60,9 @@ public class DirectDruidClientFactory this.queryWatcher = queryWatcher; this.smileMapper = smileMapper; this.httpClient = httpClient; + + int threadCount = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2); + this.queryCancellationExecutor = ScheduledExecutors.fixed(threadCount, "query-cancellation-executor"); } public DirectDruidClient makeDirectClient(DruidServer server) @@ -64,7 +74,8 @@ public class DirectDruidClientFactory httpClient, server.getScheme(), server.getHost(), - emitter + emitter, + queryCancellationExecutor ); } } diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 6322f730be9..716e7500fe0 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -31,6 +31,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; @@ -54,6 +55,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.Duration; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -67,6 +69,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; public class DirectDruidClientTest { @@ -88,6 +91,7 @@ public class DirectDruidClientTest private HttpClient httpClient; private DirectDruidClient client; private QueryableDruidServer queryableDruidServer; + private ScheduledExecutorService queryCancellationExecutor; @Before public void setup() @@ -97,6 +101,7 @@ public class DirectDruidClientTest dataSegment, new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) ); + queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); client = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, @@ -104,7 +109,8 @@ public class DirectDruidClientTest httpClient, "http", hostName, - new NoopServiceEmitter() + new NoopServiceEmitter(), + queryCancellationExecutor ); queryableDruidServer = new QueryableDruidServer( new DruidServer( @@ -121,6 +127,12 @@ public class DirectDruidClientTest serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment()); } + @After + public void teardown() throws InterruptedException + { + queryCancellationExecutor.shutdown(); + queryCancellationExecutor.awaitTermination(1, java.util.concurrent.TimeUnit.SECONDS); + } @Test public void testRun() throws Exception @@ -169,7 +181,8 @@ public class DirectDruidClientTest httpClient, "http", "foo2", - new NoopServiceEmitter() + new NoopServiceEmitter(), + queryCancellationExecutor ); QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java b/server/src/test/java/org/apache/druid/client/SimpleServerView.java index 420b7b965db..c655cf17429 100644 --- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java +++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java @@ -63,9 +63,7 @@ public class SimpleServerView implements TimelineServerView // dataSource -> version -> serverSelector private final Map> timelines = new HashMap<>(); - private final QueryToolChestWarehouse warehouse; - private final ObjectMapper objectMapper; - private final HttpClient httpClient; + private final DirectDruidClientFactory clientFactory; public SimpleServerView( QueryToolChestWarehouse warehouse, @@ -73,28 +71,18 @@ public class SimpleServerView implements TimelineServerView HttpClient httpClient ) { - this.warehouse = warehouse; - this.objectMapper = objectMapper; - this.httpClient = httpClient; + this.clientFactory = new DirectDruidClientFactory( + new NoopServiceEmitter(), + warehouse, + NOOP_QUERY_WATCHER, + objectMapper, + httpClient + ); } public void addServer(DruidServer server, DataSegment dataSegment) { - servers.put( - server, - new QueryableDruidServer<>( - server, - new DirectDruidClient<>( - warehouse, - NOOP_QUERY_WATCHER, - objectMapper, - httpClient, - server.getScheme(), - server.getHost(), - new NoopServiceEmitter() - ) - ) - ); + servers.put(server, new QueryableDruidServer<>(server, clientFactory.makeDirectClient(server))); addSegmentToServer(server, dataSegment); }