diff --git a/pom.xml b/pom.xml
index 8ac79919983..909fa04dd6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -324,17 +324,17 @@
org.eclipse.jetty
jetty-server
- 9.2.1.v20140609
+ 9.2.2.v20140723
org.eclipse.jetty
jetty-servlet
- 9.2.1.v20140609
+ 9.2.2.v20140723
org.eclipse.jetty
jetty-servlets
- 9.2.1.v20140609
+ 9.2.2.v20140723
joda-time
diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java
index 79ae1c16f6d..3f6c2bf19f8 100644
--- a/server/src/main/java/io/druid/client/RoutingDruidClient.java
+++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java
@@ -38,6 +38,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
import java.io.IOException;
+import java.net.URI;
import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,7 +73,7 @@ public class RoutingDruidClient
}
public ListenableFuture postQuery(
- String url,
+ URI uri,
Query query,
HttpResponseHandler responseHandler
)
@@ -80,9 +81,9 @@ public class RoutingDruidClient
final ListenableFuture future;
try {
- log.debug("Querying url[%s]", url);
+ log.debug("Querying url[%s]", uri);
future = httpClient
- .post(new URL(url))
+ .post(uri.toURL())
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON)
.go(responseHandler);
@@ -115,13 +116,13 @@ public class RoutingDruidClient
}
public ListenableFuture get(
- String url,
+ URI uri,
HttpResponseHandler responseHandler
)
{
try {
return httpClient
- .get(new URL(url))
+ .get(uri.toURL())
.go(responseHandler);
}
catch (IOException e) {
@@ -130,13 +131,13 @@ public class RoutingDruidClient
}
public ListenableFuture delete(
- String url,
+ URI uri,
HttpResponseHandler responseHandler
)
{
try {
return httpClient
- .delete(new URL(url))
+ .delete(uri.toURL())
.go(responseHandler);
}
catch (IOException e) {
diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
index 872e7a8343b..f2b715a18f4 100644
--- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
+++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java
@@ -27,6 +27,9 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@@ -37,7 +40,6 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
-import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -49,12 +51,13 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
-import java.io.OutputStream;
+import java.net.URI;
import java.util.Map;
import java.util.UUID;
@@ -67,7 +70,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Joiner COMMA_JOIN = Joiner.on(",");
- private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
@@ -76,7 +78,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private final RequestLogger requestLogger;
public AsyncQueryForwardingServlet(
- ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
@@ -85,7 +86,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
RequestLogger requestLogger
)
{
- this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
@@ -98,216 +98,244 @@ public class AsyncQueryForwardingServlet extends HttpServlet
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException
{
- final AsyncContext asyncContext = req.startAsync(req, res);
- // default async timeout to be same as maxIdleTime for now
- asyncContext.setTimeout(config.getMaxIdleTime().toStandardDuration().getMillis());
- asyncContext.start(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
+ final AsyncContext asyncContext = req.startAsync();
+ asyncContext.setTimeout(0);
- final String host = hostFinder.getDefaultHost();
- routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
- }
- catch (Exception e) {
- handleException(jsonMapper, asyncContext, e);
- }
- }
- }
+ final HttpResponseHandler responseHandler =
+ new PassthroughHttpResponseHandler(res);
+
+ final URI uri = rewriteURI(hostFinder.getDefaultHost(), req);
+ asyncComplete(
+ res,
+ asyncContext,
+ jsonMapper,
+ routingDruidClient.get(uri, responseHandler)
);
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
{
- final AsyncContext asyncContext = req.startAsync(req, res);
- asyncContext.start(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper);
+ final AsyncContext asyncContext = req.startAsync();
+ asyncContext.setTimeout(0);
- final String host = hostFinder.getDefaultHost();
- routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler);
- }
- catch (Exception e) {
- handleException(jsonMapper, asyncContext, e);
- }
- }
- }
+ final HttpResponseHandler responseHandler =
+ new PassthroughHttpResponseHandler(res);
+
+ final String host = hostFinder.getDefaultHost();
+
+ asyncComplete(
+ res,
+ asyncContext,
+ jsonMapper,
+ routingDruidClient.delete(rewriteURI(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler)
);
}
@Override
- protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
+ protected void doPost(final HttpServletRequest req, final HttpServletResponse res) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
- final AsyncContext asyncContext = req.startAsync(req, res);
- asyncContext.start(
- new Runnable()
- {
- @Override
- public void run()
+ final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(req.getContentType());
+ final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
+
+ try {
+ final Query inputQuery = objectMapper.readValue(req.getInputStream(), Query.class);
+ if (log.isDebugEnabled()) {
+ log.debug("Got query [%s]", inputQuery);
+ }
+
+ final Query query;
+ if (inputQuery.getId() == null) {
+ query = inputQuery.withId(UUID.randomUUID().toString());
+ } else {
+ query = inputQuery;
+ }
+
+ URI rewrittenURI = rewriteURI(hostFinder.getHost(query), req);
+
+ final AsyncContext asyncContext = req.startAsync();
+ // let proxy http client timeout
+ asyncContext.setTimeout(0);
+
+ ListenableFuture future = routingDruidClient.postQuery(
+ rewrittenURI,
+ query,
+ new PassthroughHttpResponseHandler(res)
+ );
+
+ Futures.addCallback(
+ future,
+ new FutureCallback()
{
- final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
-
- final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType());
- final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
-
- Query inputQuery = null;
- try {
- inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
- if (inputQuery.getId() == null) {
- inputQuery = inputQuery.withId(UUID.randomUUID().toString());
- }
- final Query query = inputQuery;
-
- if (log.isDebugEnabled()) {
- log.debug("Got query [%s]", inputQuery);
- }
-
- final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(
- asyncContext,
- objectMapper
- )
- {
- @Override
- public ClientResponse done(ClientResponse clientResponse)
- {
- final long requestTime = System.currentTimeMillis() - start;
- log.debug("Request time: %d", requestTime);
-
- emitter.emit(
- new ServiceMetricEvent.Builder()
- .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
- .setUser3(String.valueOf(query.getContextPriority(0)))
- .setUser4(query.getType())
- .setUser5(COMMA_JOIN.join(query.getIntervals()))
- .setUser6(String.valueOf(query.hasFilters()))
- .setUser7(request.getRemoteAddr())
- .setUser8(query.getId())
- .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
- .build("request/time", requestTime)
- );
-
- try {
- requestLogger.log(
- new RequestLogLine(
- new DateTime(),
- request.getRemoteAddr(),
- query,
- new QueryStats(
- ImmutableMap.of(
- "request/time",
- requestTime,
- "success",
- true
- )
- )
- )
- );
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
-
- return super.done(clientResponse);
- }
- };
-
- routingDruidClient.postQuery(
- makeUrl(hostFinder.getHost(inputQuery), request),
- inputQuery,
- responseHandler
+ @Override
+ public void onSuccess(@Nullable Object o)
+ {
+ final long requestTime = System.currentTimeMillis() - start;
+ emitter.emit(
+ new ServiceMetricEvent.Builder()
+ .setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
+ .setUser3(String.valueOf(query.getContextPriority(0)))
+ .setUser4(query.getType())
+ .setUser5(COMMA_JOIN.join(query.getIntervals()))
+ .setUser6(String.valueOf(query.hasFilters()))
+ .setUser7(req.getRemoteAddr())
+ .setUser8(query.getId())
+ .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
+ .build("request/time", requestTime)
);
- }
- catch (Exception e) {
- handleException(objectMapper, asyncContext, e);
try {
requestLogger.log(
new RequestLogLine(
new DateTime(),
- request.getRemoteAddr(),
- inputQuery,
- new QueryStats(ImmutableMap.of("success", false, "exception", e.toString()))
+ req.getRemoteAddr(),
+ query,
+ new QueryStats(
+ ImmutableMap.of(
+ "request/time",
+ requestTime,
+ "success",
+ true
+ )
+ )
)
);
}
- catch (Exception logError) {
- log.error(logError, "Unable to log query [%s]!", inputQuery);
+ catch (Exception e) {
+ log.error(e, "Unable to log query [%s]!", query);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable)
+ {
+ try {
+ final String errorMessage = throwable.getMessage();
+ requestLogger.log(
+ new RequestLogLine(
+ new DateTime(),
+ req.getRemoteAddr(),
+ query,
+ new QueryStats(
+ ImmutableMap.of(
+ "success",
+ false,
+ "exception",
+ errorMessage == null ? "no message" : errorMessage)
+ )
+ )
+ );
+ }
+ catch (IOException logError) {
+ log.error(logError, "Unable to log query [%s]!", query);
}
- log.makeAlert(e, "Exception handling request")
- .addData("query", inputQuery)
- .addData("peer", request.getRemoteAddr())
+ log.makeAlert(throwable, "Exception handling request [%s]", query.getId())
+ .addData("query", query)
+ .addData("peer", req.getRemoteAddr())
.emit();
}
}
+ );
+
+ asyncComplete(
+ res,
+ asyncContext,
+ objectMapper,
+ future
+ );
+ } catch(IOException e) {
+ log.warn(e, "Exception parsing query");
+ final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage();
+ requestLogger.log(
+ new RequestLogLine(
+ new DateTime(),
+ req.getRemoteAddr(),
+ null,
+ new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage))
+ )
+ );
+ res.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ objectMapper.writeValue(
+ res.getOutputStream(),
+ ImmutableMap.of("error", errorMessage)
+ );
+ } catch(Exception e) {
+ handleException(res, objectMapper, e);
+ }
+ }
+
+ private static void asyncComplete(
+ final HttpServletResponse res,
+ final AsyncContext asyncContext,
+ final ObjectMapper objectMapper,
+ ListenableFuture future
+ )
+ {
+ Futures.addCallback(
+ future,
+ new FutureCallback