mirror of https://github.com/apache/druid.git
Router misc fixes (#4517)
* make BrokerQueryResource instantiation singleton * fix druid.router.http.* handling so that they are actually used and introduce numRequestsQueued for jetty http client at router * address comments * address review comment
This commit is contained in:
parent
6d60ef67ce
commit
163b0edd79
|
@ -43,6 +43,10 @@ public class DruidHttpClientConfig
|
|||
@Min(1)
|
||||
private int numMaxThreads = Math.max(10, (Runtime.getRuntime().availableProcessors() * 17) / 16 + 2) + 30;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int numRequestsQueued = 1024;
|
||||
|
||||
@JsonProperty
|
||||
private String compressionCodec = DEFAULT_COMPRESSION_CODEC;
|
||||
|
||||
|
@ -65,4 +69,9 @@ public class DruidHttpClientConfig
|
|||
{
|
||||
return compressionCodec;
|
||||
}
|
||||
|
||||
public int getNumRequestsQueued()
|
||||
{
|
||||
return numRequestsQueued;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,8 @@ import java.lang.annotation.Annotation;
|
|||
*/
|
||||
public class JettyHttpClientModule implements Module
|
||||
{
|
||||
private static final long CLIENT_CONNECT_TIMEOUT = 500;
|
||||
|
||||
public static JettyHttpClientModule global()
|
||||
{
|
||||
return new JettyHttpClientModule("druid.global.http", Global.class);
|
||||
|
@ -117,6 +119,8 @@ public class JettyHttpClientModule implements Module
|
|||
|
||||
httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
|
||||
httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
|
||||
httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
|
||||
httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
|
||||
final QueuedThreadPool pool = new QueuedThreadPool(config.getNumMaxThreads());
|
||||
pool.setName(JettyHttpClientModule.class.getSimpleName() + "-threadPool-" + pool.hashCode());
|
||||
httpClient.setExecutor(pool);
|
||||
|
|
|
@ -76,7 +76,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper";
|
||||
|
||||
private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
|
||||
private static final int MAX_QUEUED_CANCELLATIONS = 64;
|
||||
|
||||
private final AtomicLong successfulQueryCount = new AtomicLong();
|
||||
private final AtomicLong failedQueryCount = new AtomicLong();
|
||||
private final AtomicLong interruptedQueryCount = new AtomicLong();
|
||||
|
@ -116,7 +116,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
@Smile ObjectMapper smileMapper,
|
||||
QueryHostFinder hostFinder,
|
||||
@Router Provider<HttpClient> httpClientProvider,
|
||||
DruidHttpClientConfig httpClientConfig,
|
||||
@Router DruidHttpClientConfig httpClientConfig,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger,
|
||||
GenericQueryMetricsFactory queryMetricsFactory
|
||||
|
@ -138,12 +138,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
|
|||
{
|
||||
super.init();
|
||||
|
||||
// separate client with more aggressive connection timeouts
|
||||
// to prevent cancellations requests from blocking queries
|
||||
// Note that httpClientProvider is setup to return same HttpClient instance on each get() so
|
||||
// it is same http client as that is used by parent ProxyServlet.
|
||||
broadcastClient = httpClientProvider.get();
|
||||
broadcastClient.setConnectTimeout(CANCELLATION_TIMEOUT_MILLIS);
|
||||
broadcastClient.setMaxRequestsQueuedPerDestination(MAX_QUEUED_CANCELLATIONS);
|
||||
|
||||
try {
|
||||
broadcastClient.start();
|
||||
}
|
||||
|
|
|
@ -109,6 +109,7 @@ public class CliBroker extends ServerRunnable
|
|||
|
||||
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
||||
|
||||
binder.bind(BrokerQueryResource.class).in(LazySingleton.class);
|
||||
Jerseys.addResource(binder, BrokerQueryResource.class);
|
||||
binder.bind(QueryCountStatsProvider.class).to(BrokerQueryResource.class).in(LazySingleton.class);
|
||||
Jerseys.addResource(binder, BrokerResource.class);
|
||||
|
|
|
@ -26,6 +26,7 @@ import io.druid.guice.http.DruidHttpClientConfig;
|
|||
import io.druid.server.AsyncQueryForwardingServlet;
|
||||
import io.druid.server.initialization.jetty.JettyServerInitUtils;
|
||||
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||
import io.druid.server.router.Router;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
|
@ -42,7 +43,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
|
||||
@Inject
|
||||
public RouterJettyServerInitializer(
|
||||
DruidHttpClientConfig httpClientConfig,
|
||||
@Router DruidHttpClientConfig httpClientConfig,
|
||||
AsyncQueryForwardingServlet asyncQueryForwardingServlet
|
||||
)
|
||||
{
|
||||
|
@ -62,6 +63,11 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
//NOTE: explicit maxThreads to workaround https://tickets.puppetlabs.com/browse/TK-152
|
||||
sh.setInitParameter("maxThreads", Integer.toString(httpClientConfig.getNumMaxThreads()));
|
||||
|
||||
//Needs to be set in servlet config or else overridden to default value in AbstractProxyServlet.createHttpClient()
|
||||
sh.setInitParameter("maxConnections", Integer.toString(httpClientConfig.getNumConnections()));
|
||||
sh.setInitParameter("idleTimeout", Long.toString(httpClientConfig.getReadTimeout().getMillis()));
|
||||
sh.setInitParameter("timeout", Long.toString(httpClientConfig.getReadTimeout().getMillis()));
|
||||
|
||||
root.addServlet(sh, "/druid/v2/*");
|
||||
JettyServerInitUtils.addExtensionFilters(root, injector);
|
||||
// Can't use '/*' here because of Guice conflicts with AsyncQueryForwardingServlet path
|
||||
|
|
Loading…
Reference in New Issue