Merge pull request #484 from metamx/fix-router

Cleanup code in router and add forwarding for get requests
This commit is contained in:
fjy 2014-04-16 11:46:35 -06:00
commit 141404907b
6 changed files with 240 additions and 50 deletions

View File

@ -28,8 +28,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Client;
import io.druid.query.Query;
import io.druid.server.router.Router;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
@ -52,7 +53,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
@Inject
public RoutingDruidClient(
ObjectMapper objectMapper,
@Global HttpClient httpClient
@Router HttpClient httpClient
)
{
this.objectMapper = objectMapper;
@ -67,7 +68,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return openConnections.get();
}
public ListenableFuture<FinalType> run(
public ListenableFuture<FinalType> post(
String url,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
@ -109,4 +110,19 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return future;
}
public ListenableFuture<FinalType> get(
String url,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.get(new URL(url))
.go(responseHandler);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -85,6 +85,116 @@ public class AsyncQueryForwardingServlet extends HttpServlet
this.requestLogger = requestLogger;
}
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException
{
OutputStream out = null;
AsyncContext ctx = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getDefaultHost();
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/json");
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
};
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.get(makeUrl(host, req), responseHandler);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
if (ctx != null) {
ctx.complete();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
}
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
@ -99,16 +209,16 @@ public class AsyncQueryForwardingServlet extends HttpServlet
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
AsyncContext ctx = null;
try {
final AsyncContext ctx = req.startAsync(req, resp);
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
req.setAttribute(DISPATCHED, true);
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
@ -136,15 +246,14 @@ public class AsyncQueryForwardingServlet extends HttpServlet
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) {
try {
outputStream.write(bytes);
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
}
return ClientResponse.finished(outputStream);
}
@ -153,15 +262,14 @@ public class AsyncQueryForwardingServlet extends HttpServlet
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
byte[] bytes = getContentBytes(chunk.getContent());
if (bytes.length > 0) {
try {
clientResponse.getObj().write(bytes);
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
}
return clientResponse;
}
@ -202,30 +310,26 @@ public class AsyncQueryForwardingServlet extends HttpServlet
throw Throwables.propagate(e);
}
finally {
ctx.dispatch();
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
};
ctx.start(
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.run(makeUrl(host, req), theQuery, responseHandler);
routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
if (!resp.isCommitted()) {
@ -242,6 +346,10 @@ public class AsyncQueryForwardingServlet extends HttpServlet
resp.flushBuffer();
if (ctx != null) {
ctx.complete();
}
try {
requestLogger.log(
new RequestLogLine(

View File

@ -20,6 +20,7 @@
package io.druid.server.router;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.selector.Server;
@ -49,7 +50,49 @@ public class QueryHostFinder<T>
public Server findServer(Query<T> query)
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
return findServerInner(selected);
}
public Server findDefaultServer()
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.getDefaultLookup();
return findServerInner(selected);
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
throw new ISE("No server found for query[%s]", query);
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
public String getDefaultHost()
{
Server server = findDefaultServer();
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
throw new ISE("No default server found!");
}
return server.getHost();
}
private Server findServerInner(final Pair<String, ServerDiscoverySelector> selected)
{
if (selected == null) {
log.error("Danger, Will Robinson! Unable to find any brokers!");
}
@ -82,21 +125,4 @@ public class QueryHostFinder<T>
return server;
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return null;
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Router
{
}

View File

@ -193,7 +193,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
return new Pair<>(brokerServiceName, retVal);
}
private Pair<String, ServerDiscoverySelector> getDefaultLookup()
public Pair<String, ServerDiscoverySelector> getDefaultLookup()
{
final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
final ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName);

View File

@ -29,14 +29,17 @@ import io.druid.client.RoutingDruidClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.HttpClientModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Self;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.router.CoordinatorRuleManager;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import io.druid.server.router.TieredBrokerConfig;
import io.druid.server.router.TieredBrokerHostSelector;
import org.eclipse.jetty.server.Server;
@ -62,6 +65,7 @@ public class CliRouter extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new HttpClientModule("druid.router.http", Router.class),
new Module()
{
@Override