diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index fb44af0e7be..4dba34a6bed 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -106,9 +106,12 @@ public class QueryScheduler implements QueryWatcher this.laningStrategy = laningStrategy; this.queryFutures = Multimaps.synchronizedSetMultimap(HashMultimap.create()); this.queryDatasources = Multimaps.synchronizedSetMultimap(HashMultimap.create()); - // if totalNumThreads is above 0 and less than druid.server.http.numThreads, enforce total limit + // if totalNumThreads is above 0 and less than druid.server.http.numThreads and + // requests are not being queued by Jetty, enforce total limit final boolean limitTotal; - if (totalNumThreads > 0 && totalNumThreads < serverConfig.getNumThreads()) { + if (totalNumThreads > 0 + && totalNumThreads < serverConfig.getNumThreads() + && !serverConfig.isEnableQueryRequestsQueuing()) { limitTotal = true; this.totalCapacity = totalNumThreads; } else { diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 9b72670c3fd..276a0030af4 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.server.initialization; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import org.apache.druid.common.exception.ErrorResponseTransformStrategy; import org.apache.druid.common.exception.NoErrorResponseTransformStrategy; @@ -104,6 +105,12 @@ public class ServerConfig } + @VisibleForTesting + public ServerConfig(boolean enableQueryRequestsQueuing) + { + this.enableQueryRequestsQueuing = enableQueryRequestsQueuing; + } + @JsonProperty @Min(1) private int numThreads = getDefaultNumThreads(); @@ -179,6 +186,13 @@ public class ServerConfig @JsonProperty private boolean enableHSTS = false; + /** + * This is a feature flag to enable query requests queuing when admins want to reserve some threads for + * non-query requests. This feature flag is not documented and can be removed in the future. + */ + @JsonProperty + private boolean enableQueryRequestsQueuing = false; + @JsonProperty private boolean showDetailedJettyErrors = true; @@ -288,6 +302,11 @@ public class ServerConfig return enableHSTS; } + public boolean isEnableQueryRequestsQueuing() + { + return enableQueryRequestsQueuing; + } + @Override public boolean equals(Object o) { @@ -318,7 +337,8 @@ public class ServerConfig allowedHttpMethods.equals(that.allowedHttpMethods) && errorResponseTransformStrategy.equals(that.errorResponseTransformStrategy) && Objects.equals(contentSecurityPolicy, that.getContentSecurityPolicy()) && - enableHSTS == that.enableHSTS; + enableHSTS == that.enableHSTS && + enableQueryRequestsQueuing == that.enableQueryRequestsQueuing; } @Override @@ -345,7 +365,8 @@ public class ServerConfig errorResponseTransformStrategy, showDetailedJettyErrors, contentSecurityPolicy, - enableHSTS + enableHSTS, + enableQueryRequestsQueuing ); } @@ -374,6 +395,7 @@ public class ServerConfig ", showDetailedJettyErrors=" + showDetailedJettyErrors + ", contentSecurityPolicy=" + contentSecurityPolicy + ", enableHSTS=" + enableHSTS + + ", enableQueryRequestsQueuing=" + enableQueryRequestsQueuing + '}'; } diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java index 835564dc0b4..9fc26a73bec 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java @@ -65,10 +65,18 @@ public class JettyBindings private final String[] paths; private final int maxRequests; - public QosFilterHolder(String[] paths, int maxRequests) + private final long timeoutMs; + + public QosFilterHolder(String[] paths, int maxRequests, long timeoutMs) { this.paths = paths; this.maxRequests = maxRequests; + this.timeoutMs = timeoutMs; + } + + public QosFilterHolder(String[] paths, int maxRequests) + { + this(paths, maxRequests, -1); } @Override @@ -86,6 +94,9 @@ public class JettyBindings @Override public Map getInitParameters() { + if (timeoutMs >= 0) { + return ImmutableMap.of("maxRequests", String.valueOf(maxRequests), "suspendMs", String.valueOf(timeoutMs)); + } return ImmutableMap.of("maxRequests", String.valueOf(maxRequests)); } diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index 749317d0304..eb8025a770b 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -348,6 +348,32 @@ public class QuerySchedulerTest getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true); } + @Test + public void testTotalLimitWithQueryQueuing() + { + ServerConfig serverConfig = new ServerConfig(); + QueryScheduler queryScheduler = new QueryScheduler( + serverConfig.getNumThreads() - 1, + ManualQueryPrioritizationStrategy.INSTANCE, + new NoQueryLaningStrategy(), + serverConfig + ); + Assert.assertEquals(serverConfig.getNumThreads() - 1, queryScheduler.getTotalAvailableCapacity()); + } + + @Test + public void testTotalLimitWithouQueryQueuing() + { + ServerConfig serverConfig = new ServerConfig(true); + QueryScheduler queryScheduler = new QueryScheduler( + serverConfig.getNumThreads() - 1, + ManualQueryPrioritizationStrategy.INSTANCE, + new NoQueryLaningStrategy(), + serverConfig + ); + Assert.assertEquals(-1, queryScheduler.getTotalAvailableCapacity()); + } + @Test public void testExplodingWrapperDoesNotLeakLocks() { diff --git a/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java b/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java index fb0dff1561a..e7c317c241b 100644 --- a/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java +++ b/services/src/main/java/org/apache/druid/cli/QueryJettyServerInitializer.java @@ -27,9 +27,12 @@ import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.servlet.GuiceFilter; +import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.QuerySchedulerProvider; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.initialization.jetty.JettyBindings; import org.apache.druid.server.initialization.jetty.JettyServerInitUtils; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.initialization.jetty.LimitRequestsFilter; @@ -46,6 +49,7 @@ import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -67,12 +71,20 @@ public class QueryJettyServerInitializer implements JettyServerInitializer private final AuthConfig authConfig; + private final QuerySchedulerProvider querySchedulerConfig; + @Inject - public QueryJettyServerInitializer(Set extensionHandlers, ServerConfig serverConfig, AuthConfig authConfig) + public QueryJettyServerInitializer( + Set extensionHandlers, + ServerConfig serverConfig, + AuthConfig authConfig, + @Global QuerySchedulerProvider querySchedulerConfig + ) { this.extensionHandlers = ImmutableList.copyOf(extensionHandlers); this.serverConfig = serverConfig; this.authConfig = authConfig; + this.querySchedulerConfig = querySchedulerConfig; } @Override @@ -95,6 +107,21 @@ public class QueryJettyServerInitializer implements JettyServerInitializer ); } + if (querySchedulerConfig.getNumThreads() > 0 + && querySchedulerConfig.getNumThreads() < serverConfig.getNumThreads() + && serverConfig.isEnableQueryRequestsQueuing()) { + // Add QoS filter for query requests, so they don't take up more than querySchedulerConfig#numThreads. + // While this will also pick up some extra endpoints other than Query, the primary objective is to protect + // health check endpoints from being starved by query requests. + log.info("Enabling QoS Filter on query requests with limit [%d].", querySchedulerConfig.getNumThreads()); + JettyBindings.QosFilterHolder filterHolder = new JettyBindings.QosFilterHolder( + new String[]{"/druid/v2/*"}, + querySchedulerConfig.getNumThreads(), + serverConfig.getMaxQueryTimeout() + ); + JettyServerInitUtils.addFilters(root, Collections.singleton(filterHolder)); + } + final ObjectMapper jsonMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class);