diff --git a/server/src/main/java/io/druid/client/RoutingDruidClient.java b/server/src/main/java/io/druid/client/RoutingDruidClient.java index 7ad4ec5d820..1489f472e37 100644 --- a/server/src/main/java/io/druid/client/RoutingDruidClient.java +++ b/server/src/main/java/io/druid/client/RoutingDruidClient.java @@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.HttpResponseHandler; -import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Client; import io.druid.query.Query; import org.jboss.netty.handler.codec.http.HttpHeaders; @@ -52,7 +52,7 @@ public class RoutingDruidClient @Inject public RoutingDruidClient( ObjectMapper objectMapper, - @Global HttpClient httpClient + @Client HttpClient httpClient ) { this.objectMapper = objectMapper; @@ -67,7 +67,7 @@ public class RoutingDruidClient return openConnections.get(); } - public ListenableFuture run( + public ListenableFuture post( String url, Query query, HttpResponseHandler responseHandler @@ -109,4 +109,19 @@ public class RoutingDruidClient return future; } + + public ListenableFuture get( + String url, + HttpResponseHandler responseHandler + ) + { + try { + return httpClient + .get(new URL(url)) + .go(responseHandler); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } } diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 8ce0302d193..5a03f328892 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -85,6 +85,119 @@ public class AsyncQueryForwardingServlet extends HttpServlet this.requestLogger = requestLogger; } + @Override + protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) + throws ServletException, IOException + { + OutputStream out = null; + AsyncContext ctx = null; + + try { + ctx = req.startAsync(req, resp); + final AsyncContext asyncContext = ctx; + + if (req.getAttribute(DISPATCHED) != null) { + return; + } + + out = resp.getOutputStream(); + final OutputStream outputStream = out; + + final String host = hostFinder.getDefaultHost(); + + final HttpResponseHandler responseHandler = new HttpResponseHandler() + { + @Override + public ClientResponse handleResponse(HttpResponse response) + { + resp.setStatus(response.getStatus().getCode()); + resp.setContentType("application/json"); + + byte[] bytes = getContentBytes(response.getContent()); + if (bytes.length > 0) { + try { + outputStream.write(bytes); + } + catch (Exception e) { + asyncContext.complete(); + throw Throwables.propagate(e); + } + } + return ClientResponse.finished(outputStream); + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + byte[] bytes = getContentBytes(chunk.getContent()); + if (bytes.length > 0) { + try { + clientResponse.getObj().write(bytes); + } + catch (Exception e) { + asyncContext.complete(); + throw Throwables.propagate(e); + } + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + final OutputStream obj = clientResponse.getObj(); + try { + resp.flushBuffer(); + outputStream.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + asyncContext.complete(); + } + + return ClientResponse.finished(obj); + } + }; + + asyncContext.start( + new Runnable() + { + @Override + public void run() + { + routingDruidClient.get(makeUrl(host, req), responseHandler); + } + } + ); + + asyncContext.dispatch(); + req.setAttribute(DISPATCHED, true); + } + catch (Exception e) { + if (!resp.isCommitted()) { + resp.setStatus(500); + resp.resetBuffer(); + + if (out == null) { + out = resp.getOutputStream(); + } + + if (ctx != null) { + ctx.complete(); + } + + out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8)); + out.write("\n".getBytes(UTF8)); + } + + resp.flushBuffer(); + } + } + @Override protected void doPost( final HttpServletRequest req, final HttpServletResponse resp @@ -99,16 +212,16 @@ public class AsyncQueryForwardingServlet extends HttpServlet final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; OutputStream out = null; + AsyncContext ctx = null; try { - final AsyncContext ctx = req.startAsync(req, resp); + ctx = req.startAsync(req, resp); + final AsyncContext asyncContext = ctx; if (req.getAttribute(DISPATCHED) != null) { return; } - req.setAttribute(DISPATCHED, true); - query = objectMapper.readValue(req.getInputStream(), Query.class); queryId = query.getId(); if (queryId == null) { @@ -142,6 +255,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet outputStream.write(bytes); } catch (Exception e) { + asyncContext.complete(); throw Throwables.propagate(e); } } @@ -159,6 +273,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet clientResponse.getObj().write(bytes); } catch (Exception e) { + asyncContext.complete(); throw Throwables.propagate(e); } } @@ -202,30 +317,26 @@ public class AsyncQueryForwardingServlet extends HttpServlet throw Throwables.propagate(e); } finally { - ctx.dispatch(); + asyncContext.complete(); } return ClientResponse.finished(obj); } - - private byte[] getContentBytes(ChannelBuffer content) - { - byte[] contentBytes = new byte[content.readableBytes()]; - content.readBytes(contentBytes); - return contentBytes; - } }; - ctx.start( + asyncContext.start( new Runnable() { @Override public void run() { - routingDruidClient.run(makeUrl(host, req), theQuery, responseHandler); + routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler); } } ); + + asyncContext.dispatch(); + req.setAttribute(DISPATCHED, true); } catch (Exception e) { if (!resp.isCommitted()) { @@ -242,6 +353,10 @@ public class AsyncQueryForwardingServlet extends HttpServlet resp.flushBuffer(); + if (ctx != null) { + ctx.complete(); + } + try { requestLogger.log( new RequestLogLine( @@ -272,4 +387,11 @@ public class AsyncQueryForwardingServlet extends HttpServlet } return String.format("http://%s%s?%s", host, req.getRequestURI(), req.getQueryString()); } + + private byte[] getContentBytes(ChannelBuffer content) + { + byte[] contentBytes = new byte[content.readableBytes()]; + content.readBytes(contentBytes); + return contentBytes; + } } diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java index ea32b8fbd5e..17b4db1104c 100644 --- a/server/src/main/java/io/druid/server/router/QueryHostFinder.java +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -49,7 +49,49 @@ public class QueryHostFinder public Server findServer(Query query) { final Pair selected = hostSelector.select(query); + return findServerInner(selected); + } + public Server findDefaultServer() + { + final Pair selected = hostSelector.getDefaultLookup(); + return findServerInner(selected); + } + + public String getHost(Query query) + { + Server server = findServer(query); + + if (server == null) { + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + + return null; + } + + log.debug("Selected [%s]", server.getHost()); + + return server.getHost(); + } + + public String getDefaultHost() + { + Server server = findDefaultServer(); + + if (server == null) { + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + + return null; + } + + return server.getHost(); + } + + private Server findServerInner(final Pair selected) + { if (selected == null) { log.error("Danger, Will Robinson! Unable to find any brokers!"); } @@ -82,21 +124,4 @@ public class QueryHostFinder return server; } - - public String getHost(Query query) - { - Server server = findServer(query); - - if (server == null) { - log.makeAlert( - "Catastrophic failure! No servers found at all! Failing request!" - ).emit(); - - return null; - } - - log.debug("Selected [%s]", server.getHost()); - - return server.getHost(); - } } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index 9778c3b1d6d..681acd815b1 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -193,7 +193,7 @@ public class TieredBrokerHostSelector implements HostSelector return new Pair<>(brokerServiceName, retVal); } - private Pair getDefaultLookup() + public Pair getDefaultLookup() { final String brokerServiceName = tierConfig.getDefaultBrokerServiceName(); final ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index 1d0d10073bf..a2c8903642b 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -29,10 +29,12 @@ import io.druid.client.RoutingDruidClient; import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.guice.HttpClientModule; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Self; import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.router.CoordinatorRuleManager; @@ -62,6 +64,7 @@ public class CliRouter extends ServerRunnable protected List getModules() { return ImmutableList.of( + new HttpClientModule("druid.router.http", Client.class), new Module() { @Override