Merge pull request #2674 from metamx/fix-broadcast-lockup

separate HTTP client pool for cancellation requests
This commit is contained in:
Charles Allen 2016-03-17 15:23:42 -07:00
commit 45c413af7e
1 changed files with 24 additions and 1 deletions

View File

@ -69,6 +69,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query";
private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper";
private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
private static final int MAX_QUEUED_CANCELLATIONS = 64;
private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception)
throws IOException
{
@ -93,6 +96,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private HttpClient broadcastClient;
public AsyncQueryForwardingServlet(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
@ -112,6 +117,23 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
this.requestLogger = requestLogger;
}
@Override
public void init() throws ServletException
{
// separate client with more aggressive connection timeouts
// to prevent cancellations requests from blocking queries
broadcastClient = httpClientProvider.get();
broadcastClient.setConnectTimeout(CANCELLATION_TIMEOUT_MILLIS);
broadcastClient.setMaxRequestsQueuedPerDestination(MAX_QUEUED_CANCELLATIONS);
try {
broadcastClient.start();
} catch(Exception e) {
throw new ServletException(e);
}
super.init();
}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
@ -132,9 +154,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
// to keep the code simple, the proxy servlet will also send a request to one of the default brokers
if (!host.equals(defaultHost)) {
// issue async requests
getHttpClient()
broadcastClient
.newRequest(rewriteURI(request, host))
.method(HttpMethod.DELETE)
.timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
.send(
new Response.CompleteListener()
{