mirror of https://github.com/apache/druid.git
Reserve threads for non-query requests without using laning (#14576)
This PR uses the QoSFilter available in Jetty to park the query requests that exceed a configured limit. This is done so that other HTTP requests such as health check calls do not get blocked if the query server is busy serving long-running queries. The same mechanism can also be used in the future to isolate interactive queries from long-running select queries from interactive queries within the same broker. Right now, you can still get that isolation by setting druid.query.scheduler.numThreads to a value lowe than druid.server.http.numThreads. That enables total laning but the side effect is that excess requests are not queued and rejected outright that leads to a bad user experience. Parked requests are timed out after 30 seconds by default. I overrode that to the maxQueryTimeout in this PR.
This commit is contained in:
parent
024ce40f1a
commit
1ddbaa8744
|
@ -106,9 +106,12 @@ public class QueryScheduler implements QueryWatcher
|
|||
this.laningStrategy = laningStrategy;
|
||||
this.queryFutures = Multimaps.synchronizedSetMultimap(HashMultimap.create());
|
||||
this.queryDatasources = Multimaps.synchronizedSetMultimap(HashMultimap.create());
|
||||
// if totalNumThreads is above 0 and less than druid.server.http.numThreads, enforce total limit
|
||||
// if totalNumThreads is above 0 and less than druid.server.http.numThreads and
|
||||
// requests are not being queued by Jetty, enforce total limit
|
||||
final boolean limitTotal;
|
||||
if (totalNumThreads > 0 && totalNumThreads < serverConfig.getNumThreads()) {
|
||||
if (totalNumThreads > 0
|
||||
&& totalNumThreads < serverConfig.getNumThreads()
|
||||
&& !serverConfig.isEnableQueryRequestsQueuing()) {
|
||||
limitTotal = true;
|
||||
this.totalCapacity = totalNumThreads;
|
||||
} else {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.server.initialization;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.common.exception.ErrorResponseTransformStrategy;
|
||||
import org.apache.druid.common.exception.NoErrorResponseTransformStrategy;
|
||||
|
@ -104,6 +105,12 @@ public class ServerConfig
|
|||
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ServerConfig(boolean enableQueryRequestsQueuing)
|
||||
{
|
||||
this.enableQueryRequestsQueuing = enableQueryRequestsQueuing;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int numThreads = getDefaultNumThreads();
|
||||
|
@ -179,6 +186,13 @@ public class ServerConfig
|
|||
@JsonProperty
|
||||
private boolean enableHSTS = false;
|
||||
|
||||
/**
|
||||
* This is a feature flag to enable query requests queuing when admins want to reserve some threads for
|
||||
* non-query requests. This feature flag is not documented and can be removed in the future.
|
||||
*/
|
||||
@JsonProperty
|
||||
private boolean enableQueryRequestsQueuing = false;
|
||||
|
||||
@JsonProperty
|
||||
private boolean showDetailedJettyErrors = true;
|
||||
|
||||
|
@ -288,6 +302,11 @@ public class ServerConfig
|
|||
return enableHSTS;
|
||||
}
|
||||
|
||||
public boolean isEnableQueryRequestsQueuing()
|
||||
{
|
||||
return enableQueryRequestsQueuing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -318,7 +337,8 @@ public class ServerConfig
|
|||
allowedHttpMethods.equals(that.allowedHttpMethods) &&
|
||||
errorResponseTransformStrategy.equals(that.errorResponseTransformStrategy) &&
|
||||
Objects.equals(contentSecurityPolicy, that.getContentSecurityPolicy()) &&
|
||||
enableHSTS == that.enableHSTS;
|
||||
enableHSTS == that.enableHSTS &&
|
||||
enableQueryRequestsQueuing == that.enableQueryRequestsQueuing;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -345,7 +365,8 @@ public class ServerConfig
|
|||
errorResponseTransformStrategy,
|
||||
showDetailedJettyErrors,
|
||||
contentSecurityPolicy,
|
||||
enableHSTS
|
||||
enableHSTS,
|
||||
enableQueryRequestsQueuing
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -374,6 +395,7 @@ public class ServerConfig
|
|||
", showDetailedJettyErrors=" + showDetailedJettyErrors +
|
||||
", contentSecurityPolicy=" + contentSecurityPolicy +
|
||||
", enableHSTS=" + enableHSTS +
|
||||
", enableQueryRequestsQueuing=" + enableQueryRequestsQueuing +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
|
|
@ -65,10 +65,18 @@ public class JettyBindings
|
|||
private final String[] paths;
|
||||
private final int maxRequests;
|
||||
|
||||
public QosFilterHolder(String[] paths, int maxRequests)
|
||||
private final long timeoutMs;
|
||||
|
||||
public QosFilterHolder(String[] paths, int maxRequests, long timeoutMs)
|
||||
{
|
||||
this.paths = paths;
|
||||
this.maxRequests = maxRequests;
|
||||
this.timeoutMs = timeoutMs;
|
||||
}
|
||||
|
||||
public QosFilterHolder(String[] paths, int maxRequests)
|
||||
{
|
||||
this(paths, maxRequests, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -86,6 +94,9 @@ public class JettyBindings
|
|||
@Override
|
||||
public Map<String, String> getInitParameters()
|
||||
{
|
||||
if (timeoutMs >= 0) {
|
||||
return ImmutableMap.of("maxRequests", String.valueOf(maxRequests), "suspendMs", String.valueOf(timeoutMs));
|
||||
}
|
||||
return ImmutableMap.of("maxRequests", String.valueOf(maxRequests));
|
||||
}
|
||||
|
||||
|
|
|
@ -348,6 +348,32 @@ public class QuerySchedulerTest
|
|||
getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTotalLimitWithQueryQueuing()
|
||||
{
|
||||
ServerConfig serverConfig = new ServerConfig();
|
||||
QueryScheduler queryScheduler = new QueryScheduler(
|
||||
serverConfig.getNumThreads() - 1,
|
||||
ManualQueryPrioritizationStrategy.INSTANCE,
|
||||
new NoQueryLaningStrategy(),
|
||||
serverConfig
|
||||
);
|
||||
Assert.assertEquals(serverConfig.getNumThreads() - 1, queryScheduler.getTotalAvailableCapacity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTotalLimitWithouQueryQueuing()
|
||||
{
|
||||
ServerConfig serverConfig = new ServerConfig(true);
|
||||
QueryScheduler queryScheduler = new QueryScheduler(
|
||||
serverConfig.getNumThreads() - 1,
|
||||
ManualQueryPrioritizationStrategy.INSTANCE,
|
||||
new NoQueryLaningStrategy(),
|
||||
serverConfig
|
||||
);
|
||||
Assert.assertEquals(-1, queryScheduler.getTotalAvailableCapacity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplodingWrapperDoesNotLeakLocks()
|
||||
{
|
||||
|
|
|
@ -27,9 +27,12 @@ import com.google.inject.Inject;
|
|||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import org.apache.druid.guice.annotations.Global;
|
||||
import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.QuerySchedulerProvider;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
import org.apache.druid.server.initialization.jetty.JettyBindings;
|
||||
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
|
||||
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
|
||||
import org.apache.druid.server.initialization.jetty.LimitRequestsFilter;
|
||||
|
@ -46,6 +49,7 @@ import org.eclipse.jetty.servlet.FilterHolder;
|
|||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -67,12 +71,20 @@ public class QueryJettyServerInitializer implements JettyServerInitializer
|
|||
|
||||
private final AuthConfig authConfig;
|
||||
|
||||
private final QuerySchedulerProvider querySchedulerConfig;
|
||||
|
||||
@Inject
|
||||
public QueryJettyServerInitializer(Set<Handler> extensionHandlers, ServerConfig serverConfig, AuthConfig authConfig)
|
||||
public QueryJettyServerInitializer(
|
||||
Set<Handler> extensionHandlers,
|
||||
ServerConfig serverConfig,
|
||||
AuthConfig authConfig,
|
||||
@Global QuerySchedulerProvider querySchedulerConfig
|
||||
)
|
||||
{
|
||||
this.extensionHandlers = ImmutableList.copyOf(extensionHandlers);
|
||||
this.serverConfig = serverConfig;
|
||||
this.authConfig = authConfig;
|
||||
this.querySchedulerConfig = querySchedulerConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -95,6 +107,21 @@ public class QueryJettyServerInitializer implements JettyServerInitializer
|
|||
);
|
||||
}
|
||||
|
||||
if (querySchedulerConfig.getNumThreads() > 0
|
||||
&& querySchedulerConfig.getNumThreads() < serverConfig.getNumThreads()
|
||||
&& serverConfig.isEnableQueryRequestsQueuing()) {
|
||||
// Add QoS filter for query requests, so they don't take up more than querySchedulerConfig#numThreads.
|
||||
// While this will also pick up some extra endpoints other than Query, the primary objective is to protect
|
||||
// health check endpoints from being starved by query requests.
|
||||
log.info("Enabling QoS Filter on query requests with limit [%d].", querySchedulerConfig.getNumThreads());
|
||||
JettyBindings.QosFilterHolder filterHolder = new JettyBindings.QosFilterHolder(
|
||||
new String[]{"/druid/v2/*"},
|
||||
querySchedulerConfig.getNumThreads(),
|
||||
serverConfig.getMaxQueryTimeout()
|
||||
);
|
||||
JettyServerInitUtils.addFilters(root, Collections.singleton(filterHolder));
|
||||
}
|
||||
|
||||
final ObjectMapper jsonMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
|
||||
final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class);
|
||||
|
||||
|
|
Loading…
Reference in New Issue