fix gzip compression

- fixes compression not working for POST requests
- router now forwards raw bytes instead of decompressing
- cleanded up router servlet initialization
- add test for gzip on get and post methods
- use Jersey annotation when possible in QueryResource
This commit is contained in:
Xavier Léauté 2014-12-10 17:03:33 -08:00
parent 6f6cc463b0
commit 0de56efe5c
9 changed files with 140 additions and 48 deletions

View File

@ -127,6 +127,10 @@ public class JettyHttpClientModule implements Module
public void start() throws Exception
{
httpClient.start();
// forwards raw bytes, don't decode gzip
// decoders are populated on start, so this has to be done after start() is called
httpClient.getContentDecoderFactories().clear();
}
@Override

View File

@ -48,18 +48,22 @@ import org.joda.time.DateTime;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.UUID;
@ -116,26 +120,30 @@ public class QueryResource
}
@POST
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public Response doPost(
@Context HttpServletRequest req,
@Context final HttpServletResponse resp
) throws ServletException, IOException
InputStream in,
@QueryParam("pretty") String pretty,
@Context HttpServletRequest req // used only to get request content-type and remote address
) throws IOException
{
final long start = System.currentTimeMillis();
Query query = null;
byte[] requestQuery = null;
String queryId = null;
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(req.getContentType()) || APPLICATION_SMILE.equals(req.getContentType());
final String reqContentType = req.getContentType();
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType) || APPLICATION_SMILE.equals(reqContentType);
final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
final ObjectWriter jsonWriter = req.getParameter("pretty") == null
? objectMapper.writer()
: objectMapper.writerWithDefaultPrettyPrinter();
final ObjectWriter jsonWriter = pretty != null
? objectMapper.writerWithDefaultPrettyPrinter()
: objectMapper.writer();
try {
query = objectMapper.readValue(req.getInputStream(), Query.class);
query = objectMapper.readValue(in, Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = UUID.randomUUID().toString();

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.initialization;
import com.google.common.base.Joiner;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlets.AsyncGzipFilter;
import org.eclipse.jetty.servlets.GzipFilter;
import javax.ws.rs.HttpMethod;
public abstract class BaseJettyServerInitializer implements JettyServerInitializer
{
public static final String GZIP_METHODS = Joiner.on(",").join(HttpMethod.GET, HttpMethod.POST);
public FilterHolder defaultGzipFilterHolder() {
final FilterHolder gzipFilterHolder = new FilterHolder(GzipFilter.class);
gzipFilterHolder.setInitParameter("minGzipSize", "0");
gzipFilterHolder.setInitParameter("methods", GZIP_METHODS);
return gzipFilterHolder;
}
public FilterHolder defaultAsyncGzipFilterHolder() {
final FilterHolder gzipFilterHolder = new FilterHolder(AsyncGzipFilter.class);
gzipFilterHolder.setInitParameter("minGzipSize", "0");
gzipFilterHolder.setInitParameter("methods", GZIP_METHODS);
return gzipFilterHolder;
}
}

View File

@ -45,12 +45,10 @@ import io.druid.server.DruidNode;
import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -60,6 +58,7 @@ import org.junit.Test;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
@ -68,6 +67,7 @@ import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.Random;
@ -81,6 +81,7 @@ public class JettyTest
{
private Lifecycle lifecycle;
private HttpClient client;
private int port = -1;
public static void setProperties()
{
@ -101,16 +102,20 @@ public class JettyTest
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", 9999)
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
);
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
Jerseys.addResource(binder, SlowResource.class);
Jerseys.addResource(binder, ExceptionResource.class);
Jerseys.addResource(binder, DefaultResource.class);
LifecycleModule.register(binder, Server.class);
}
}
)
);
final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class));
port = node.getPort();
lifecycle = injector.getInstance(Lifecycle.class);
lifecycle.start();
ClientHolder holder = injector.getInstance(ClientHolder.class);
@ -142,7 +147,7 @@ public class JettyTest
long startTime2 = 0;
try {
ListenableFuture<StatusResponseHolder> go =
client.get(new URL("http://localhost:9999/slow/hello"))
client.get(new URL("http://localhost:" + port + "/slow/hello"))
.go(new StatusResponseHandler(Charset.defaultCharset()));
startTime2 = System.currentTimeMillis();
go.get();
@ -173,6 +178,21 @@ public class JettyTest
latch.await();
}
@Test
public void testGzipCompression() throws Exception
{
final URL url = new URL("http://localhost:" + port + "/default");
final HttpURLConnection get = (HttpURLConnection) url.openConnection();
get.setRequestProperty("Accept-Encoding", "gzip");
Assert.assertEquals("gzip", get.getContentEncoding());
final HttpURLConnection post = (HttpURLConnection) url.openConnection();
post.setRequestProperty("Accept-Encoding", "gzip");
post.setRequestMethod("POST");
Assert.assertEquals("gzip", post.getContentEncoding());
}
// Tests that threads are not stuck when partial chunk is not finalized
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=424107
@Test
@ -181,7 +201,7 @@ public class JettyTest
public void testChunkNotFinalized() throws Exception
{
ListenableFuture<InputStream> go =
client.get(new URL("http://localhost:9999/exception/exception"))
client.get(new URL("http://localhost:" + port + "/exception/exception"))
.go(new InputStreamResponseHandler());
try {
StringWriter writer = new StringWriter();
@ -207,7 +227,7 @@ public class JettyTest
try {
ListenableFuture<InputStream> go = client.get(
new URL(
"http://localhost:9999/exception/exception"
"http://localhost:" + port + "/exception/exception"
)
)
@ -251,7 +271,7 @@ public class JettyTest
}
}
public static class JettyServerInit implements JettyServerInitializer
public static class JettyServerInit extends BaseJettyServerInitializer
{
@Override
@ -259,11 +279,11 @@ public class JettyTest
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(defaultGzipFilterHolder(), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
handlerList.setHandlers(new Handler[]{root});
server.setHandler(handlerList);
}
}
@ -289,6 +309,25 @@ public class JettyTest
}
}
@Path("/default")
public static class DefaultResource
{
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response get()
{
return Response.ok("hello").build();
}
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response post()
{
return Response.ok("hello").build();
}
}
@Path("/exception")
public static class ExceptionResource
{

View File

@ -74,6 +74,7 @@ import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.BaseJettyServerInitializer;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.tasklogs.TaskLogs;
@ -84,7 +85,6 @@ import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.ResourceCollection;
import java.util.List;
@ -221,7 +221,7 @@ public class CliOverlord extends ServerRunnable
/**
*/
private static class OverlordJettyServerInitializer implements JettyServerInitializer
private static class OverlordJettyServerInitializer extends BaseJettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
@ -239,7 +239,7 @@ public class CliOverlord extends ServerRunnable
}
)
);
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(defaultGzipFilterHolder(), "/*", null);
// /status should not redirect, so add first
root.addFilter(GuiceFilter.class, "/status/*", null);

View File

@ -24,7 +24,7 @@ import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.http.RedirectFilter;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.BaseJettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.HandlerList;
@ -32,12 +32,11 @@ import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.Resource;
/**
*/
class CoordinatorJettyServerInitializer implements JettyServerInitializer
class CoordinatorJettyServerInitializer extends BaseJettyServerInitializer
{
private final DruidCoordinatorConfig config;
@ -60,7 +59,7 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
} else {
root.setResourceBase(config.getConsoleStatic());
}
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(defaultGzipFilterHolder(), "/*", null);
// /status should not redirect, so add first
root.addFilter(GuiceFilter.class, "/status/*", null);
@ -68,8 +67,8 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
// redirect anything other than status to the current lead
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
// Can't use '/*' here because of Guice and Jetty static content conflicts
// The coordinator really needs a standarized api path
// Can't use '/*' here because of Guice and Jetty static content conflicts
root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null);
// this will be removed in the next major release

View File

@ -21,7 +21,7 @@ package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.BaseJettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
@ -29,18 +29,17 @@ import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
class MiddleManagerJettyServerInitializer implements JettyServerInitializer
class MiddleManagerJettyServerInitializer extends BaseJettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(defaultGzipFilterHolder(), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();

View File

@ -21,30 +21,29 @@ package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.BaseJettyServerInitializer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class QueryJettyServerInitializer implements JettyServerInitializer
public class QueryJettyServerInitializer extends BaseJettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(defaultGzipFilterHolder(), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
handlerList.setHandlers(new Handler[]{root});
server.setHandler(handlerList);
}
}

View File

@ -27,23 +27,21 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.BaseJettyServerInitializer;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.AsyncGzipFilter;
/**
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
public class RouterJettyServerInitializer extends BaseJettyServerInitializer
{
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
@ -73,8 +71,10 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.addServlet(
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addServlet(
new ServletHolder(
new AsyncQueryForwardingServlet(
jsonMapper,
@ -86,15 +86,12 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
)
), "/druid/v2/*"
);
queries.addFilter(AsyncGzipFilter.class, "/druid/v2/*", null);
queries.addFilter(GuiceFilter.class, "/status/*", null);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GuiceFilter.class, "/*", null);
root.addFilter(defaultAsyncGzipFilterHolder(), "/*", null);
// Can't use '/*' here because of Guice conflicts with AsyncQueryForwardingServlet path
root.addFilter(GuiceFilter.class, "/status/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
handlerList.setHandlers(new Handler[]{root});
server.setHandler(handlerList);
}
}