Fix query-cancellation-executor memory leak (#15754)

This PR fixes #15069 by resolving a memory leak caused by a thread leak in query-cancellation-executor.
This commit is contained in:
Bartosz Mikulski 2024-02-07 06:24:38 +01:00 committed by GitHub
parent 59bca0951a
commit 43a1c96cd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 44 additions and 27 deletions

View File

@ -78,6 +78,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -127,7 +128,8 @@ public class DirectDruidClient<T> implements QueryRunner<T>
HttpClient httpClient,
String scheme,
String host,
ServiceEmitter emitter
ServiceEmitter emitter,
ScheduledExecutorService queryCancellationExecutor
)
{
this.warehouse = warehouse;
@ -140,7 +142,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
this.queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor");
this.queryCancellationExecutor = queryCancellationExecutor;
}
public int getNumOpenConnections()
@ -551,7 +553,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (!responseFuture.isDone()) {
log.error("Error cancelling query[%s]", query);
}
StatusResponseHolder response = responseFuture.get();
StatusResponseHolder response = responseFuture.get(30, TimeUnit.SECONDS);
if (response.getStatus().getCode() >= 500) {
log.error("Error cancelling query[%s]: queriable node returned status[%d] [%s].",
query,
@ -562,6 +564,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
catch (ExecutionException | InterruptedException e) {
log.error(e, "Error cancelling query[%s]", query);
}
catch (TimeoutException e) {
log.error(e, "Timed out cancelling query[%s]", query);
}
};
queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS);
}

View File

@ -21,16 +21,22 @@ package org.apache.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.utils.JvmUtils;
import java.util.concurrent.ScheduledExecutorService;
/**
* Factory for building {@link DirectDruidClient}
*/
@LazySingleton
public class DirectDruidClientFactory
{
private final ServiceEmitter emitter;
@ -38,6 +44,7 @@ public class DirectDruidClientFactory
private final QueryWatcher queryWatcher;
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ScheduledExecutorService queryCancellationExecutor;
@Inject
public DirectDruidClientFactory(
@ -53,6 +60,9 @@ public class DirectDruidClientFactory
this.queryWatcher = queryWatcher;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
int threadCount = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2);
this.queryCancellationExecutor = ScheduledExecutors.fixed(threadCount, "query-cancellation-executor");
}
public DirectDruidClient makeDirectClient(DruidServer server)
@ -64,7 +74,8 @@ public class DirectDruidClientFactory
httpClient,
server.getScheme(),
server.getHost(),
emitter
emitter,
queryCancellationExecutor
);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
@ -54,6 +55,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -67,6 +69,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
public class DirectDruidClientTest
{
@ -88,6 +91,7 @@ public class DirectDruidClientTest
private HttpClient httpClient;
private DirectDruidClient client;
private QueryableDruidServer queryableDruidServer;
private ScheduledExecutorService queryCancellationExecutor;
@Before
public void setup()
@ -97,6 +101,7 @@ public class DirectDruidClientTest
dataSegment,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor");
client = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
@ -104,7 +109,8 @@ public class DirectDruidClientTest
httpClient,
"http",
hostName,
new NoopServiceEmitter()
new NoopServiceEmitter(),
queryCancellationExecutor
);
queryableDruidServer = new QueryableDruidServer(
new DruidServer(
@ -121,6 +127,12 @@ public class DirectDruidClientTest
serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment());
}
@After
public void teardown() throws InterruptedException
{
queryCancellationExecutor.shutdown();
queryCancellationExecutor.awaitTermination(1, java.util.concurrent.TimeUnit.SECONDS);
}
@Test
public void testRun() throws Exception
@ -169,7 +181,8 @@ public class DirectDruidClientTest
httpClient,
"http",
"foo2",
new NoopServiceEmitter()
new NoopServiceEmitter(),
queryCancellationExecutor
);
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(

View File

@ -63,9 +63,7 @@ public class SimpleServerView implements TimelineServerView
// dataSource -> version -> serverSelector
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap<>();
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final DirectDruidClientFactory clientFactory;
public SimpleServerView(
QueryToolChestWarehouse warehouse,
@ -73,28 +71,18 @@ public class SimpleServerView implements TimelineServerView
HttpClient httpClient
)
{
this.warehouse = warehouse;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.clientFactory = new DirectDruidClientFactory(
new NoopServiceEmitter(),
warehouse,
NOOP_QUERY_WATCHER,
objectMapper,
httpClient
);
}
public void addServer(DruidServer server, DataSegment dataSegment)
{
servers.put(
server,
new QueryableDruidServer<>(
server,
new DirectDruidClient<>(
warehouse,
NOOP_QUERY_WATCHER,
objectMapper,
httpClient,
server.getScheme(),
server.getHost(),
new NoopServiceEmitter()
)
)
);
servers.put(server, new QueryableDruidServer<>(server, clientFactory.makeDirectClient(server)));
addSegmentToServer(server, dataSegment);
}