From 1718a7224bc71243c4bb6195ddec9d38cc95a992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 15 Mar 2016 13:27:16 -0700 Subject: [PATCH] separate HTTP pool for cancellation requests * reduces contention between queries and cancellation requests * more aggressive timeouts for cancellation requests --- .../server/AsyncQueryForwardingServlet.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index b7a9057b92a..d4ae69cc86c 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -69,6 +69,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query"; private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper"; + private static final int CANCELLATION_TIMEOUT_MILLIS = 500; + private static final int MAX_QUEUED_CANCELLATIONS = 64; + private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) throws IOException { @@ -93,6 +96,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet private final ServiceEmitter emitter; private final RequestLogger requestLogger; + private HttpClient broadcastClient; + public AsyncQueryForwardingServlet( @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, @@ -112,6 +117,23 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet this.requestLogger = requestLogger; } + @Override + public void init() throws ServletException + { + // separate client with more aggressive connection timeouts + // to prevent cancellations requests from blocking queries + broadcastClient = httpClientProvider.get(); + broadcastClient.setConnectTimeout(CANCELLATION_TIMEOUT_MILLIS); + broadcastClient.setMaxRequestsQueuedPerDestination(MAX_QUEUED_CANCELLATIONS); + + try { + broadcastClient.start(); + } catch(Exception e) { + throw new ServletException(e); + } + super.init(); + } + @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { @@ -132,9 +154,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet // to keep the code simple, the proxy servlet will also send a request to one of the default brokers if (!host.equals(defaultHost)) { // issue async requests - getHttpClient() + broadcastClient .newRequest(rewriteURI(request, host)) .method(HttpMethod.DELETE) + .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) .send( new Response.CompleteListener() {