diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java index 10170fcfb9e..79ae1c16f6d 100644 --- a/server/src/main/java/io/druid/client/RoutingDruidClient.java +++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java @@ -22,6 +22,7 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMultimap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -30,7 +31,9 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.HttpResponseHandler; import io.druid.guice.annotations.Client; import io.druid.query.Query; +import io.druid.server.QueryResource; import io.druid.server.router.Router; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpHeaders; import javax.inject.Inject; @@ -68,7 +71,7 @@ public class RoutingDruidClient return openConnections.get(); } - public ListenableFuture post( + public ListenableFuture postQuery( String url, Query query, HttpResponseHandler responseHandler @@ -81,7 +84,7 @@ public class RoutingDruidClient future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON) .go(responseHandler); openConnections.getAndIncrement(); @@ -125,4 +128,19 @@ public class RoutingDruidClient throw Throwables.propagate(e); } } + + public ListenableFuture delete( + String url, + HttpResponseHandler responseHandler + ) + { + try { + return httpClient + .delete(new URL(url)) + .go(responseHandler); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } } diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 5360d529ad3..3d82cc013ea 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -21,7 +21,11 @@ package io.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -37,19 +41,20 @@ import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; import org.joda.time.DateTime; +import javax.annotation.Nullable; import javax.servlet.AsyncContext; import javax.servlet.ServletException; -import javax.servlet.ServletOutputStream; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStream; -import java.nio.charset.Charset; +import java.util.Map; import java.util.UUID; /** @@ -59,8 +64,6 @@ import java.util.UUID; public class AsyncQueryForwardingServlet extends HttpServlet { private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final String DISPATCHED = "dispatched"; private static final Joiner COMMA_JOIN = Joiner.on(","); private final ObjectMapper jsonMapper; @@ -88,275 +91,161 @@ public class AsyncQueryForwardingServlet extends HttpServlet } @Override - protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) + protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - OutputStream out = null; - AsyncContext ctx = null; - - try { - ctx = req.startAsync(req, resp); - final AsyncContext asyncContext = ctx; - - if (req.getAttribute(DISPATCHED) != null) { - return; - } - - out = resp.getOutputStream(); - final OutputStream outputStream = out; - - final String host = hostFinder.getDefaultHost(); - - final HttpResponseHandler responseHandler = new HttpResponseHandler() - { - @Override - public ClientResponse handleResponse(HttpResponse response) + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() { - resp.setStatus(response.getStatus().getCode()); - resp.setContentType("application/json"); - - try { - ChannelBuffer buf = response.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - - return ClientResponse.finished(outputStream); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - try { - ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return clientResponse; - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final OutputStream obj = clientResponse.getObj(); - try { - resp.flushBuffer(); - outputStream.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - asyncContext.complete(); - } - - return ClientResponse.finished(obj); - } - - @Override - public void exceptionCaught( - ClientResponse clientResponse, - Throwable e - ) - { - handleException(resp, asyncContext, e); - } - }; - - asyncContext.start( - new Runnable() + @Override + public void run() { - @Override - public void run() - { - routingDruidClient.get(makeUrl(host, req), responseHandler); + try { + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + + final String host = hostFinder.getDefaultHost(); + routingDruidClient.get(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); + } + catch (Exception e) { + handleException(jsonMapper, asyncContext, e); } } - ); - - asyncContext.dispatch(); - req.setAttribute(DISPATCHED, true); - } - catch (Exception e) { - handleException(resp, ctx, e); - } + } + ); } @Override - protected void doPost( - final HttpServletRequest req, final HttpServletResponse resp - ) throws ServletException, IOException + protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - final long start = System.currentTimeMillis(); - Query query = null; - String queryId; - - final boolean isSmile = "application/smile".equals(req.getContentType()); - - final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - - OutputStream out = null; - AsyncContext ctx = null; - - try { - ctx = req.startAsync(req, resp); - final AsyncContext asyncContext = ctx; - - if (req.getAttribute(DISPATCHED) != null) { - return; - } - - query = objectMapper.readValue(req.getInputStream(), Query.class); - queryId = query.getId(); - if (queryId == null) { - queryId = UUID.randomUUID().toString(); - query = query.withId(queryId); - } - - if (log.isDebugEnabled()) { - log.debug("Got query [%s]", query); - } - - out = resp.getOutputStream(); - final OutputStream outputStream = out; - - final String host = hostFinder.getHost(query); - - final Query theQuery = query; - final String theQueryId = queryId; - - final HttpResponseHandler responseHandler = new HttpResponseHandler() - { - @Override - public ClientResponse handleResponse(HttpResponse response) + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() { - resp.setStatus(response.getStatus().getCode()); - resp.setContentType("application/x-javascript"); - - try { - ChannelBuffer buf = response.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return ClientResponse.finished(outputStream); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - try { - ChannelBuffer buf = chunk.getContent(); - buf.readBytes(outputStream, buf.readableBytes()); - } - catch (Exception e) { - asyncContext.complete(); - throw Throwables.propagate(e); - } - return clientResponse; - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - final long requestTime = System.currentTimeMillis() - start; - - log.debug("Request time: %d", requestTime); - - emitter.emit( - new ServiceMetricEvent.Builder() - .setUser2(DataSourceUtil.getMetricName(theQuery.getDataSource())) - .setUser4(theQuery.getType()) - .setUser5(COMMA_JOIN.join(theQuery.getIntervals())) - .setUser6(String.valueOf(theQuery.hasFilters())) - .setUser7(req.getRemoteAddr()) - .setUser8(theQueryId) - .setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString()) - .build("request/time", requestTime) - ); - - final OutputStream obj = clientResponse.getObj(); - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - theQuery, - new QueryStats(ImmutableMap.of("request/time", requestTime, "success", true)) - ) - ); - - resp.flushBuffer(); - outputStream.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - asyncContext.complete(); - } - - return ClientResponse.finished(obj); - } - - @Override - public void exceptionCaught( - ClientResponse clientResponse, - Throwable e - ) - { - handleException(resp, asyncContext, e); - } - }; - - asyncContext.start( - new Runnable() + @Override + public void run() { - @Override - public void run() - { - routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler); + try { + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler(asyncContext, jsonMapper); + + final String host = hostFinder.getDefaultHost(); + routingDruidClient.delete(makeUrl(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler); + } + catch (Exception e) { + handleException(jsonMapper, asyncContext, e); } } - ); + } + ); + } - asyncContext.dispatch(); - req.setAttribute(DISPATCHED, true); - } - catch (Exception e) { - handleException(resp, ctx, e); + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException + { + final long start = System.currentTimeMillis(); + final AsyncContext asyncContext = req.startAsync(req, res); + asyncContext.start( + new Runnable() + { + @Override + public void run() + { + final HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest(); - try { - requestLogger.log( - new RequestLogLine( - new DateTime(), - req.getRemoteAddr(), - query, - new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) - ) - ); - } - catch (Exception e2) { - log.error(e2, "Unable to log query [%s]!", query); - } + final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType()); + final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; - log.makeAlert(e, "Exception handling request") - .addData("query", query) - .addData("peer", req.getRemoteAddr()) - .emit(); - } + Query inputQuery = null; + try { + inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + if (inputQuery.getId() == null) { + inputQuery = inputQuery.withId(UUID.randomUUID().toString()); + } + final Query query = inputQuery; + + if (log.isDebugEnabled()) { + log.debug("Got query [%s]", inputQuery); + } + + final HttpResponseHandler responseHandler = new PassthroughHttpResponseHandler( + asyncContext, + objectMapper + ) + { + @Override + public ClientResponse done(ClientResponse clientResponse) + { + final long requestTime = System.currentTimeMillis() - start; + log.debug("Request time: %d", requestTime); + + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser2(DataSourceUtil.getMetricName(query.getDataSource())) + .setUser4(query.getType()) + .setUser5(COMMA_JOIN.join(query.getIntervals())) + .setUser6(String.valueOf(query.hasFilters())) + .setUser7(request.getRemoteAddr()) + .setUser8(query.getId()) + .setUser9(query.getDuration().toPeriod().toStandardMinutes().toString()) + .build("request/time", requestTime) + ); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "request/time", + requestTime, + "success", + true + ) + ) + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + return super.done(clientResponse); + } + }; + + routingDruidClient.postQuery( + makeUrl(hostFinder.getHost(inputQuery), request), + inputQuery, + responseHandler + ); + } + catch (Exception e) { + handleException(objectMapper, asyncContext, e); + + try { + requestLogger.log( + new RequestLogLine( + new DateTime(), + request.getRemoteAddr(), + inputQuery, + new QueryStats(ImmutableMap.of("success", false, "exception", e.toString())) + ) + ); + } + catch (Exception logError) { + log.error(logError, "Unable to log query [%s]!", inputQuery); + } + + log.makeAlert(e, "Exception handling request") + .addData("query", inputQuery) + .addData("peer", request.getRemoteAddr()) + .emit(); + } + } + } + ); } private String makeUrl(final String host, final HttpServletRequest req) @@ -370,24 +259,126 @@ public class AsyncQueryForwardingServlet extends HttpServlet return String.format("http://%s%s?%s", host, requestURI, queryString); } - private static void handleException(HttpServletResponse resp, AsyncContext ctx, Throwable e) + private static void handleException(ObjectMapper objectMapper, AsyncContext asyncContext, Throwable exception) { try { - final ServletOutputStream out = resp.getOutputStream(); - if (!resp.isCommitted()) { - resp.setStatus(500); - resp.resetBuffer(); - out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); - out.write("\n".getBytes(UTF8)); + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + if (!response.isCommitted()) { + final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage(); + + response.resetBuffer(); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of( + "error", errorMessage + ) + ); + } + response.flushBuffer(); + } + catch (IOException e) { + Throwables.propagate(e); + } + finally { + asyncContext.complete(); + } + } + + private static class PassthroughHttpResponseHandler implements HttpResponseHandler + { + private final AsyncContext asyncContext; + private final ObjectMapper objectMapper; + private final OutputStream outputStream; + + public PassthroughHttpResponseHandler(AsyncContext asyncContext, ObjectMapper objectMapper) throws IOException + { + this.asyncContext = asyncContext; + this.objectMapper = objectMapper; + this.outputStream = asyncContext.getResponse().getOutputStream(); + } + + protected void copyStatusHeaders(HttpResponse clientResponse) + { + final HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + response.setStatus(clientResponse.getStatus().getCode()); + response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + + FluentIterable.from(clientResponse.headers().entries()) + .filter(new Predicate>() + { + @Override + public boolean apply(@Nullable Map.Entry input) + { + return input.getKey().startsWith("X-Druid"); + } + } + ) + .transform( + new Function, Object>() + { + @Nullable + @Override + public Object apply(@Nullable Map.Entry input) + { + response.setHeader(input.getKey(), input.getValue()); + return null; + } + } + ) + .allMatch(Predicates.alwaysTrue()); + } + + @Override + public ClientResponse handleResponse(HttpResponse clientResponse) + { + copyStatusHeaders(clientResponse); + + try { + ChannelBuffer buf = clientResponse.getContent(); + buf.readBytes(outputStream, buf.readableBytes()); + } + catch (Exception e) { + throw Throwables.propagate(e); } - if (ctx != null) { - ctx.complete(); - } - resp.flushBuffer(); + return ClientResponse.finished(outputStream); } - catch (IOException e1) { - Throwables.propagate(e1); + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + try { + ChannelBuffer buf = chunk.getContent(); + buf.readBytes(outputStream, buf.readableBytes()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + asyncContext.complete(); + return ClientResponse.finished(clientResponse.getObj()); + } + + @Override + public void exceptionCaught( + ClientResponse clientResponse, + Throwable e + ) + { + // throwing an exception here may cause resource leak + try { + handleException(objectMapper, asyncContext, e); + } catch(Exception err) { + log.error(err, "Unable to handle exception response"); + } } } }