diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index b7a9057b92a..d4ae69cc86c 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -69,6 +69,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query"; private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper"; + private static final int CANCELLATION_TIMEOUT_MILLIS = 500; + private static final int MAX_QUEUED_CANCELLATIONS = 64; + private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) throws IOException { @@ -93,6 +96,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet private final ServiceEmitter emitter; private final RequestLogger requestLogger; + private HttpClient broadcastClient; + public AsyncQueryForwardingServlet( @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, @@ -112,6 +117,23 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet this.requestLogger = requestLogger; } + @Override + public void init() throws ServletException + { + // separate client with more aggressive connection timeouts + // to prevent cancellations requests from blocking queries + broadcastClient = httpClientProvider.get(); + broadcastClient.setConnectTimeout(CANCELLATION_TIMEOUT_MILLIS); + broadcastClient.setMaxRequestsQueuedPerDestination(MAX_QUEUED_CANCELLATIONS); + + try { + broadcastClient.start(); + } catch(Exception e) { + throw new ServletException(e); + } + super.init(); + } + @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { @@ -132,9 +154,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet // to keep the code simple, the proxy servlet will also send a request to one of the default brokers if (!host.equals(defaultHost)) { // issue async requests - getHttpClient() + broadcastClient .newRequest(rewriteURI(request, host)) .method(HttpMethod.DELETE) + .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) .send( new Response.CompleteListener() {