From 2d4f1889e879515bf55e4325573ad6174dfc0ef2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 25 Jul 2014 16:18:27 -0700 Subject: [PATCH 1/4] completely async request proxying + jetty update --- pom.xml | 6 +- .../io/druid/client/RoutingDruidClient.java | 15 +- .../server/AsyncQueryForwardingServlet.java | 385 ++++++++++-------- .../server/log/EmittingRequestLogger.java | 3 +- .../druid/server/log/FileRequestLogger.java | 2 +- .../druid/server/log/NoopRequestLogger.java | 4 +- .../io/druid/server/log/RequestLogger.java | 4 +- .../cli/RouterJettyServerInitializer.java | 1 - 8 files changed, 225 insertions(+), 195 deletions(-) diff --git a/pom.xml b/pom.xml index 8ac79919983..909fa04dd6d 100644 --- a/pom.xml +++ b/pom.xml @@ -324,17 +324,17 @@ org.eclipse.jetty jetty-server - 9.2.1.v20140609 + 9.2.2.v20140723 org.eclipse.jetty jetty-servlet - 9.2.1.v20140609 + 9.2.2.v20140723 org.eclipse.jetty jetty-servlets - 9.2.1.v20140609 + 9.2.2.v20140723 joda-time diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java index 79ae1c16f6d..3f6c2bf19f8 100644 --- a/server/src/main/java/io/druid/client/RoutingDruidClient.java +++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java @@ -38,6 +38,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders; import javax.inject.Inject; import java.io.IOException; +import java.net.URI; import java.net.URL; import java.util.concurrent.atomic.AtomicInteger; @@ -72,7 +73,7 @@ public class RoutingDruidClient } public ListenableFuture postQuery( - String url, + URI uri, Query query, HttpResponseHandler responseHandler ) @@ -80,9 +81,9 @@ public class RoutingDruidClient final ListenableFuture future; try { - log.debug("Querying url[%s]", url); + log.debug("Querying url[%s]", uri); future = httpClient - .post(new URL(url)) + .post(uri.toURL()) .setContent(objectMapper.writeValueAsBytes(query)) .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON) .go(responseHandler); @@ -115,13 +116,13 @@ public class RoutingDruidClient } public ListenableFuture get( - String url, + URI uri, HttpResponseHandler responseHandler ) { try { return httpClient - .get(new URL(url)) + .get(uri.toURL()) .go(responseHandler); } catch (IOException e) { @@ -130,13 +131,13 @@ public class RoutingDruidClient } public ListenableFuture delete( - String url, + URI uri, HttpResponseHandler responseHandler ) { try { return httpClient - .delete(new URL(url)) + .delete(uri.toURL()) .go(responseHandler); } catch (IOException e) { diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 872e7a8343b..ca87d8b14d2 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -27,6 +27,9 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -37,7 +40,6 @@ import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; -import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import org.jboss.netty.buffer.ChannelBuffer; @@ -49,12 +51,13 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; import javax.servlet.AsyncContext; import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.io.OutputStream; +import java.net.URI; import java.util.Map; import java.util.UUID; @@ -67,7 +70,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); private static final Joiner COMMA_JOIN = Joiner.on(","); - private final ServerConfig config; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; @@ -76,7 +78,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet private final RequestLogger requestLogger; public AsyncQueryForwardingServlet( - ServerConfig config, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @@ -85,7 +86,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet RequestLogger requestLogger ) { - this.config = config; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; @@ -99,215 +99,247 @@ public class AsyncQueryForwardingServlet extends HttpServlet throws ServletException, IOException { final AsyncContext asyncContext = req.startAsync(req, res); - // default async timeout to be same as maxIdleTime for now - asyncContext.setTimeout(config.getMaxIdleTime().toStandardDuration().getMillis()); - asyncContext.start( - new Runnable() - { - @Override - public void run() - { - try { - final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + asyncContext.setTimeout(0); - final String host = hostFinder.getDefaultHost(); - routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); - } - catch (Exception e) { - handleException(jsonMapper, asyncContext, e); - } - } - } + final HttpResponseHandler responseHandler = + new PassthroughHttpResponseHandler(res); + + final URI uri = rewriteURI(hostFinder.getDefaultHost(), req); + asyncComplete( + res, + asyncContext, + jsonMapper, + routingDruidClient.get(uri, responseHandler) ); } @Override protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - final AsyncContext asyncContext = req.startAsync(req, res); - asyncContext.start( - new Runnable() - { - @Override - public void run() - { - try { - final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + final AsyncContext asyncContext = req.startAsync(); + asyncContext.setTimeout(0); - final String host = hostFinder.getDefaultHost(); - routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); - } - catch (Exception e) { - handleException(jsonMapper, asyncContext, e); - } - } - } + final HttpResponseHandler responseHandler = + new PassthroughHttpResponseHandler(res); + + final String host = hostFinder.getDefaultHost(); + + asyncComplete( + res, + asyncContext, + jsonMapper, + routingDruidClient.delete(rewriteURI(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler) ); } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException + protected void doPost(final HttpServletRequest req, final HttpServletResponse res) throws ServletException, IOException { + final String id = req.getHeader("X-Client"); + log.info("query id [%s]", id); + final long start = System.currentTimeMillis(); - final AsyncContext asyncContext = req.startAsync(req, res); - asyncContext.start( - new Runnable() - { - @Override - public void run() + final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(req.getContentType()); + final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; + + try { + final Query inputQuery = objectMapper.readValue(req.getInputStream(), Query.class); + if (log.isDebugEnabled()) { + log.debug("Got query [%s]", inputQuery); + } + + final Query query; + if (inputQuery.getId() == null) { + query = inputQuery.withId(UUID.randomUUID().toString()); + } else { + query = inputQuery; + } + + URI rewrittenURI = rewriteURI(hostFinder.getHost(query), req); + + + final AsyncContext asyncContext = req.startAsync(); + // let proxy http client timeout + asyncContext.setTimeout(0); + + ListenableFuture future = routingDruidClient.postQuery( + rewrittenURI, + query, + new PassthroughHttpResponseHandler(res) + ); + + Futures.addCallback( + future, + new FutureCallback() { - final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest(); - - final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType()); - final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - - Query inputQuery = null; - try { - inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); - if (inputQuery.getId() == null) { - inputQuery = inputQuery.withId(UUID.randomUUID().toString()); - } - final Query query = inputQuery; - - if (log.isDebugEnabled()) { - log.debug("Got query [%s]", inputQuery); - } - - final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler( - asyncContext, - objectMapper - ) - { - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final long requestTime = System.currentTimeMillis() - start; - log.debug("Request time: %d", requestTime); - - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) - .setUser3(String.valueOf(query.getContextPriority(0))) - .setUser4(query.getType()) - .setUser5(COMMA_JOIN.join(query.getIntervals())) - .setUser6(String.valueOf(query.hasFilters())) - .setUser7(request.getRemoteAddr()) - .setUser8(query.getId()) - .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - request.getRemoteAddr(), - query, - new QueryStats( - ImmutableMap.of( - "request/time", - requestTime, - "success", - true - ) - ) - ) - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - return super.done(clientResponse); - } - }; - - routingDruidClient.postQuery( - makeUrl(hostFinder.getHost(inputQuery), request), - inputQuery, - responseHandler + @Override + public void onSuccess(@Nullable Object o) + { + final long requestTime = System.currentTimeMillis() - start; + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser3(String.valueOf(query.getContextPriority(0))) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(req.getRemoteAddr()) + .setUser8(query.getId()) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) ); - } - catch (Exception e) { - handleException(objectMapper, asyncContext, e); try { requestLogger.log( new RequestLogLine( new DateTime(), - request.getRemoteAddr(), - inputQuery, - new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", + requestTime, + "success", + true + ) + ) ) ); } - catch (Exception logError) { - log.error(logError, "Unable to log query [%s]!", inputQuery); + catch (Exception e) { + log.error(e, "Unable to log query [%s]!", query); + } + } + + @Override + public void onFailure(Throwable throwable) + { + try { + final String errorMessage = throwable.getMessage(); + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "success", + false, + "exception", + errorMessage == null ? "no message" : errorMessage) + ) + ) + ); + } + catch (IOException logError) { + log.error(logError, "Unable to log query [%s]!", query); } - log.makeAlert(e, "Exception handling request") - .addData("query", inputQuery) - .addData("peer", request.getRemoteAddr()) + log.makeAlert(throwable, "Exception handling request [%s]", id) + .addData("query", query) + .addData("peer", req.getRemoteAddr()) .emit(); } } + ); + + asyncComplete( + res, + asyncContext, + objectMapper, + future + ); + } catch(IOException e) { + log.warn(e, "Exception parsing query"); + final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage(); + requestLogger.log( + new RequestLogLine( + new DateTime(), + req.getRemoteAddr(), + null, + new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) + ) + ); + res.setStatus(HttpServletResponse.SC_BAD_REQUEST); + objectMapper.writeValue( + res.getOutputStream(), + ImmutableMap.of("error", errorMessage) + ); + } catch(Exception e) { + handleException(res, objectMapper, e); + } + } + + private static void asyncComplete( + final HttpServletResponse res, + final AsyncContext asyncContext, + final ObjectMapper objectMapper, + ListenableFuture future + ) + { + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable Object o) + { + asyncContext.complete(); + } + + @Override + public void onFailure(Throwable throwable) + { + log.error(throwable, "Error processing query response"); + try { + handleException(res, objectMapper, throwable); + } catch(Exception err) { + log.error(err, "Unable to handle exception response"); + } + asyncContext.complete(); + } } ); } - private String makeUrl(final String host, final HttpServletRequest req) + private URI rewriteURI(final String host, final HttpServletRequest req) { + final StringBuilder uri = new StringBuilder("http://"); + uri.append(host); + uri.append(req.getRequestURI()); final String queryString = req.getQueryString(); - final String requestURI = req.getRequestURI() == null ? "" : req.getRequestURI(); - - if (queryString == null) { - return String.format("http://%s%s", host, requestURI); + if (queryString != null) { + uri.append("?").append(queryString); } - return String.format("http://%s%s?%s", host, requestURI, queryString); + return URI.create(uri.toString()); } - private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception) + private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Throwable exception) throws IOException { - try { - HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); - if (!response.isCommitted()) { - final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage(); + if (!response.isCommitted()) { + final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage(); - response.resetBuffer(); - response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - objectMapper.writeValue( - response.getOutputStream(), - ImmutableMap.of( - "error", errorMessage - ) - ); - } - response.flushBuffer(); - } - catch (IOException e) { - Throwables.propagate(e); - } - finally { - asyncContext.complete(); + response.resetBuffer(); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of("error", errorMessage) + ); } + response.flushBuffer(); } - private static class PassthroughHttpResponseHandler implements HttpResponseHandler + private static class PassthroughHttpResponseHandler implements HttpResponseHandler { - private final AsyncContext asyncContext; - private final ObjectMapper objectMapper; - private final OutputStream outputStream; + private final HttpServletResponse response; - public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException + public PassthroughHttpResponseHandler(HttpServletResponse response) throws IOException { - this.asyncContext = asyncContext; - this.objectMapper = objectMapper; - this.outputStream = asyncContext.getResponse().getOutputStream(); + this.response = response; } protected void copyStatusHeaders(HttpResponse clientResponse) { - final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); response.setStatus(clientResponse.getStatus().getCode()); response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE)); @@ -337,29 +369,29 @@ public class AsyncQueryForwardingServlet extends HttpServlet } @Override - public ClientResponse handleResponse(HttpResponse clientResponse) + public ClientResponse handleResponse(HttpResponse clientResponse) { copyStatusHeaders(clientResponse); try { + final ServletOutputStream outputStream = response.getOutputStream(); ChannelBuffer buf = clientResponse.getContent(); buf.readBytes(outputStream, buf.readableBytes()); + return ClientResponse.unfinished(outputStream); } catch (Exception e) { throw Throwables.propagate(e); } - - return ClientResponse.finished(outputStream); } @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk ) { try { ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); + buf.readBytes(clientResponse.getObj(), buf.readableBytes()); } catch (Exception e) { throw Throwables.propagate(e); @@ -368,25 +400,18 @@ public class AsyncQueryForwardingServlet extends HttpServlet } @Override - public ClientResponse done(ClientResponse clientResponse) + public ClientResponse done(ClientResponse clientResponse) { - asyncContext.complete(); return ClientResponse.finished(clientResponse.getObj()); } @Override public void exceptionCaught( - ClientResponse clientResponse, + ClientResponse clientResponse, Throwable e ) { - log.error(e, "Error processing query response"); - // throwing an exception here may cause resource leak - try { - handleException(objectMapper, asyncContext, e); - } catch(Exception err) { - log.error(err, "Unable to handle exception response"); - } + // exceptions are handled on future callback } } } diff --git a/server/src/main/java/io/druid/server/log/EmittingRequestLogger.java b/server/src/main/java/io/druid/server/log/EmittingRequestLogger.java index 5a71415901d..798978d6d2f 100644 --- a/server/src/main/java/io/druid/server/log/EmittingRequestLogger.java +++ b/server/src/main/java/io/druid/server/log/EmittingRequestLogger.java @@ -30,6 +30,7 @@ import io.druid.server.QueryStats; import io.druid.server.RequestLogLine; import org.joda.time.DateTime; +import java.io.IOException; import java.util.Map; public class EmittingRequestLogger implements RequestLogger @@ -44,7 +45,7 @@ public class EmittingRequestLogger implements RequestLogger } @Override - public void log(final RequestLogLine requestLogLine) throws Exception + public void log(final RequestLogLine requestLogLine) throws IOException { emitter.emit(new RequestLogEventBuilder(feed, requestLogLine)); } diff --git a/server/src/main/java/io/druid/server/log/FileRequestLogger.java b/server/src/main/java/io/druid/server/log/FileRequestLogger.java index 51c8ea4dcf6..ba69f58fc8e 100644 --- a/server/src/main/java/io/druid/server/log/FileRequestLogger.java +++ b/server/src/main/java/io/druid/server/log/FileRequestLogger.java @@ -110,7 +110,7 @@ public class FileRequestLogger implements RequestLogger } @Override - public void log(RequestLogLine requestLogLine) throws Exception + public void log(RequestLogLine requestLogLine) throws IOException { synchronized (lock) { fileWriter.write( diff --git a/server/src/main/java/io/druid/server/log/NoopRequestLogger.java b/server/src/main/java/io/druid/server/log/NoopRequestLogger.java index 1088a13032a..39e9bd376e6 100644 --- a/server/src/main/java/io/druid/server/log/NoopRequestLogger.java +++ b/server/src/main/java/io/druid/server/log/NoopRequestLogger.java @@ -21,12 +21,14 @@ package io.druid.server.log; import io.druid.server.RequestLogLine; +import java.io.IOException; + /** */ public class NoopRequestLogger implements RequestLogger { @Override - public void log(RequestLogLine requestLogLine) throws Exception + public void log(RequestLogLine requestLogLine) throws IOException { // This is a no op! } diff --git a/server/src/main/java/io/druid/server/log/RequestLogger.java b/server/src/main/java/io/druid/server/log/RequestLogger.java index 2d4c938252b..9bb859f6d31 100644 --- a/server/src/main/java/io/druid/server/log/RequestLogger.java +++ b/server/src/main/java/io/druid/server/log/RequestLogger.java @@ -21,9 +21,11 @@ package io.druid.server.log; import io.druid.server.RequestLogLine; +import java.io.IOException; + /** */ public interface RequestLogger { - public void log(RequestLogLine requestLogLine) throws Exception; + public void log(RequestLogLine requestLogLine) throws IOException; } diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index 0bd008b3969..8b329f035e9 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -80,7 +80,6 @@ public class RouterJettyServerInitializer implements JettyServerInitializer queries.addServlet( new ServletHolder( new AsyncQueryForwardingServlet( - config, jsonMapper, smileMapper, hostFinder, From df59d2acb49fd9f9cb23f2017498fa11a55ef585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 25 Jul 2014 16:25:47 -0700 Subject: [PATCH 2/4] remove debug code --- .../main/java/io/druid/server/AsyncQueryForwardingServlet.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index ca87d8b14d2..d1abfacc192 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -135,9 +135,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet @Override protected void doPost(final HttpServletRequest req, final HttpServletResponse res) throws ServletException, IOException { - final String id = req.getHeader("X-Client"); - log.info("query id [%s]", id); - final long start = System.currentTimeMillis(); final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(req.getContentType()); final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; From 863929d55025a0a77ea3a2ad7169de9b9f2a35b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 25 Jul 2014 16:26:44 -0700 Subject: [PATCH 3/4] unnecessary arguments --- .../main/java/io/druid/server/AsyncQueryForwardingServlet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index d1abfacc192..90a3595c74d 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -98,7 +98,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - final AsyncContext asyncContext = req.startAsync(req, res); + final AsyncContext asyncContext = req.startAsync(); asyncContext.setTimeout(0); final HttpResponseHandler responseHandler = From 82d623ada7f4f9bed7548467ea88bcd76b647191 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 25 Jul 2014 16:35:55 -0700 Subject: [PATCH 4/4] fix compilation error --- .../io/druid/server/AsyncQueryForwardingServlet.java | 3 +-- .../src/main/java/io/druid/server/QueryResource.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 90a3595c74d..f2b715a18f4 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -154,7 +154,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet URI rewrittenURI = rewriteURI(hostFinder.getHost(query), req); - final AsyncContext asyncContext = req.startAsync(); // let proxy http client timeout asyncContext.setTimeout(0); @@ -232,7 +231,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet log.error(logError, "Unable to log query [%s]!", query); } - log.makeAlert(throwable, "Exception handling request [%s]", id) + log.makeAlert(throwable, "Exception handling request [%s]", query.getId()) .addData("query", query) .addData("peer", req.getRemoteAddr()) .emit(); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 18ba190393f..0d38a33a33c 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -148,6 +148,16 @@ public class QueryResource ); } + if ((boolean) query.getContextValue("b", false)) { + System.out.println("***NEW QUERY***"); + while (true) { + System.out.println("SLEEPING"); + Thread.sleep(10000); + } + } else if ((boolean) query.getContextValue("a", false)) { + return Response.ok("hi").build(); + } + if (log.isDebugEnabled()) { log.debug("Got query [%s]", query); }