diff --git a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java index 29e114d3522..ccfd666822d 100644 --- a/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java +++ b/server/src/main/java/io/druid/guice/http/JettyHttpClientModule.java @@ -124,11 +124,6 @@ public class JettyHttpClientModule implements Module @Override public void start() throws Exception { - httpClient.start(); - - // forwards raw bytes, don't decode gzip - // decoders are populated on start, so this has to be done after start() is called - httpClient.getContentDecoderFactories().clear(); } @Override diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index d69f9908258..b07ad878297 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -17,37 +17,38 @@ package io.druid.server; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.inject.Provider; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; +import io.druid.guice.http.DruidHttpClientConfig; import io.druid.query.Query; import io.druid.query.QueryMetricUtil; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import io.druid.server.router.Router; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; -import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.proxy.AsyncProxyServlet; import org.joda.time.DateTime; -import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import java.io.IOException; import java.net.URI; -import java.util.Enumeration; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -60,6 +61,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet @Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE private static final String APPLICATION_SMILE = "application/smile"; + private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host"; + private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query"; + private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper"; + private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) throws IOException { @@ -79,7 +84,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; - private final HttpClient httpClient; + private final Provider httpClientProvider; + private final DruidHttpClientConfig httpClientConfig; private final ServiceEmitter emitter; private final RequestLogger requestLogger; @@ -87,7 +93,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, - @Router HttpClient httpClient, + @Router Provider httpClientProvider, + DruidHttpClientConfig httpClientConfig, ServiceEmitter emitter, RequestLogger requestLogger ) @@ -95,7 +102,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; - this.httpClient = httpClient; + this.httpClientProvider = httpClientProvider; + this.httpClientConfig = httpClientConfig; this.emitter = emitter; this.requestLogger = requestLogger; } @@ -105,23 +113,25 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet { final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(request.getContentType()) || APPLICATION_SMILE.equals(request.getContentType()); final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; + request.setAttribute(OBJECTMAPPER_ATTRIBUTE, objectMapper); String host = hostFinder.getDefaultHost(); - Query inputQuery = null; - boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null; - boolean isQuery = request.getMethod().equals(HttpMethod.POST.asString()); - long startTime = System.currentTimeMillis(); + request.setAttribute(HOST_ATTRIBUTE, host); + + boolean isQuery = request.getMethod().equals(HttpMethod.POST.asString()) && + request.getRequestURI().startsWith("/druid/v2"); // queries only exist for POST if (isQuery) { try { - inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); if (inputQuery != null) { - host = hostFinder.getHost(inputQuery); + request.setAttribute(HOST_ATTRIBUTE, hostFinder.getHost(inputQuery)); if (inputQuery.getId() == null) { inputQuery = inputQuery.withId(UUID.randomUUID().toString()); } } + request.setAttribute(QUERY_ATTRIBUTE, inputQuery); } catch (IOException e) { log.warn(e, "Exception parsing query"); @@ -149,78 +159,67 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet } } - URI rewrittenURI = rewriteURI(host, request); - if (rewrittenURI == null) { - onRewriteFailed(request, response); - return; - } + super.service(request, response); + } - final Request proxyRequest = getHttpClient().newRequest(rewrittenURI) - .method(request.getMethod()) - .version(HttpVersion.fromString(request.getProtocol())); + @Override + protected void customizeProxyRequest(Request proxyRequest, HttpServletRequest request) + { + proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); + proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS); - // Copy headers - for (Enumeration headerNames = request.getHeaderNames(); headerNames.hasMoreElements(); ) { - String headerName = headerNames.nextElement(); - - if (HttpHeader.TRANSFER_ENCODING.is(headerName)) { - hasContent = true; - } - - for (Enumeration headerValues = request.getHeaders(headerName); headerValues.hasMoreElements(); ) { - String headerValue = headerValues.nextElement(); - if (headerValue != null) { - proxyRequest.header(headerName, headerValue); - } + final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); + if (query != null) { + final ObjectMapper objectMapper = (ObjectMapper) request.getAttribute(OBJECTMAPPER_ATTRIBUTE); + try { + proxyRequest.content(new BytesContentProvider(objectMapper.writeValueAsBytes(query))); + } catch(JsonProcessingException e) { + Throwables.propagate(e); } } + } - // Add proxy headers - addViaHeader(proxyRequest); - - addXForwardedHeaders(proxyRequest, request); - - final AsyncContext asyncContext = request.startAsync(); - // We do not timeout the continuation, but the proxy request - asyncContext.setTimeout(0); - proxyRequest.timeout( - getTimeout(), TimeUnit.MILLISECONDS - ); - - if (hasContent) { - if (inputQuery != null) { - proxyRequest.content(new BytesContentProvider(jsonMapper.writeValueAsBytes(inputQuery))); - } else { - proxyRequest.content(proxyRequestContent(proxyRequest, request)); - } - } - - customizeProxyRequest(proxyRequest, request); - - if (isQuery) { - proxyRequest.send(newMetricsEmittingProxyResponseListener(request, response, inputQuery, startTime)); + @Override + protected Response.Listener newProxyResponseListener( + HttpServletRequest request, HttpServletResponse response + ) + { + final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); + if (query != null) { + return newMetricsEmittingProxyResponseListener(request, response, query, System.currentTimeMillis()); } else { - proxyRequest.send(newProxyResponseListener(request, response)); + return super.newProxyResponseListener(request, response); } } + @Override + protected URI rewriteURI(HttpServletRequest request) + { + final String host = (String) request.getAttribute(HOST_ATTRIBUTE); + final StringBuilder uri = new StringBuilder("http://"); + + uri.append(host); + uri.append(request.getRequestURI()); + final String queryString = request.getQueryString(); + if (queryString != null) { + uri.append("?").append(queryString); + } + return URI.create(uri.toString()); + } + + @Override + protected HttpClient newHttpClient() + { + return httpClientProvider.get(); + } + @Override protected HttpClient createHttpClient() throws ServletException { - return httpClient; - } - - private URI rewriteURI(final String host, final HttpServletRequest req) - { - final StringBuilder uri = new StringBuilder("http://"); - - uri.append(host); - uri.append(req.getRequestURI()); - final String queryString = req.getQueryString(); - if (queryString != null) { - uri.append("?").append(queryString); - } - return URI.create(uri.toString()); + HttpClient client = super.createHttpClient(); + // override timeout set in ProxyServlet.createHttpClient + setTimeout(httpClientConfig.getReadTimeout().getMillis()); + return client; } private Response.Listener newMetricsEmittingProxyResponseListener( diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java new file mode 100644 index 00000000000..5376315a366 --- /dev/null +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.Self; +import io.druid.guice.annotations.Smile; +import io.druid.guice.http.DruidHttpClientConfig; +import io.druid.initialization.Initialization; +import io.druid.query.Query; +import io.druid.server.initialization.BaseJettyServerInitializer; +import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.initialization.BaseJettyTest; +import io.druid.server.log.RequestLogger; +import io.druid.server.metrics.NoopServiceEmitter; +import io.druid.server.router.QueryHostFinder; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; + +public class AsyncQueryForwardingServletTest extends BaseJettyTest +{ + @Before + public void setup() throws Exception + { + setProperties(); + Injector injector = setupInjector(); + final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class)); + port = node.getPort(); + + lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + ClientHolder holder = injector.getInstance(ClientHolder.class); + client = holder.getClient(); + } + + @Override + protected Injector setupInjector() + { + return Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null) + ); + binder.bind(JettyServerInitializer.class).to(ProxyJettyServerInit.class).in(LazySingleton.class); + Jerseys.addResource(binder, SlowResource.class); + Jerseys.addResource(binder, ExceptionResource.class); + Jerseys.addResource(binder, DefaultResource.class); + LifecycleModule.register(binder, Server.class); + } + } + ) + ); + } + + @Test + public void testProxyGzipCompression() throws Exception + { + final URL url = new URL("http://localhost:" + port + "/proxy/default"); + + final HttpURLConnection get = (HttpURLConnection) url.openConnection(); + get.setRequestProperty("Accept-Encoding", "gzip"); + Assert.assertEquals("gzip", get.getContentEncoding()); + + final HttpURLConnection post = (HttpURLConnection) url.openConnection(); + post.setRequestProperty("Accept-Encoding", "gzip"); + post.setRequestMethod("POST"); + Assert.assertEquals("gzip", post.getContentEncoding()); + + final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection(); + Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding()); + + final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection(); + postNoGzip.setRequestMethod("POST"); + Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding()); + } + + public static class ProxyJettyServerInit extends BaseJettyServerInitializer + { + + private final DruidNode node; + + @Inject + public ProxyJettyServerInit(@Self DruidNode node) + { + this.node = node; + } + + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + + final QueryHostFinder hostFinder = new QueryHostFinder(null) + { + @Override + public String getHost(Query query) + { + return "localhost:" + node.getPort(); + } + + @Override + public String getDefaultHost() + { + return "localhost:" + node.getPort(); + } + }; + + root.addServlet( + new ServletHolder( + new AsyncQueryForwardingServlet( + injector.getInstance(ObjectMapper.class), + injector.getInstance(Key.get(ObjectMapper.class, Smile.class)), + hostFinder, + injector.getProvider(org.eclipse.jetty.client.HttpClient.class), + injector.getInstance(DruidHttpClientConfig.class), + new NoopServiceEmitter(), + new RequestLogger() + { + @Override + public void log(RequestLogLine requestLogLine) throws IOException + { + // noop + } + } + ) { + @Override + protected URI rewriteURI(HttpServletRequest request) + { + URI uri = super.rewriteURI(request); + return URI.create(uri.toString().replace("/proxy", "")); + } + } + ), "/proxy/*" + ); + + root.addFilter(defaultAsyncGzipFilterHolder(), "/*", null); + root.addFilter(GuiceFilter.class, "/slow/*", null); + root.addFilter(GuiceFilter.class, "/default/*", null); + root.addFilter(GuiceFilter.class, "/exception/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{root}); + server.setHandler(handlerList); + } + } +} diff --git a/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java b/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java new file mode 100644 index 00000000000..ccc289ffe9c --- /dev/null +++ b/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.initialization; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.Jerseys; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.Self; +import io.druid.initialization.Initialization; +import io.druid.server.DruidNode; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Before; + +import javax.net.ssl.SSLContext; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class BaseJettyTest +{ + protected Lifecycle lifecycle; + protected HttpClient client; + protected int port = -1; + + public static void setProperties() + { + System.setProperty("druid.server.http.numThreads", "20"); + System.setProperty("druid.server.http.maxIdleTime", "PT1S"); + System.setProperty("druid.global.http.readTimeout", "PT1S"); + } + + @Before + public void setup() throws Exception + { + setProperties(); + Injector injector = setupInjector(); + final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class)); + port = node.getPort(); + lifecycle = injector.getInstance(Lifecycle.class); + lifecycle.start(); + ClientHolder holder = injector.getInstance(ClientHolder.class); + client = holder.getClient(); + } + + protected Injector setupInjector() + { + return Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null) + ); + binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class); + Jerseys.addResource(binder, SlowResource.class); + Jerseys.addResource(binder, ExceptionResource.class); + Jerseys.addResource(binder, DefaultResource.class); + LifecycleModule.register(binder, Server.class); + } + } + ) + ); + } + + @After + public void teardown() + { + lifecycle.stop(); + } + + public static class ClientHolder + { + HttpClient client; + + ClientHolder() + { + try { + this.client = HttpClientInit.createClient( + new HttpClientConfig(1, SSLContext.getDefault(), Duration.ZERO), + new Lifecycle() + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + public HttpClient getClient() + { + return client; + } + } + + public static class JettyServerInit extends BaseJettyServerInitializer + { + + @Override + public void initialize(Server server, Injector injector) + { + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(defaultGzipFilterHolder(), "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{root}); + server.setHandler(handlerList); + } + + } + + @Path("/slow") + public static class SlowResource + { + + public static Random random = new Random(); + + @GET + @Path("/hello") + @Produces(MediaType.APPLICATION_JSON) + public Response hello() + { + try { + TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(2000)); + } + catch (InterruptedException e) { + // + } + return Response.ok("hello").build(); + } + } + + @Path("/default") + public static class DefaultResource + { + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response get() + { + return Response.ok("hello").build(); + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + public Response post() + { + return Response.ok("hello").build(); + } + } + + @Path("/exception") + public static class ExceptionResource + { + @GET + @Path("/exception") + @Produces(MediaType.APPLICATION_JSON) + public Response exception( + @Context HttpServletResponse resp + ) throws IOException + { + final ServletOutputStream outputStream = resp.getOutputStream(); + outputStream.println("hello"); + outputStream.flush(); + try { + TimeUnit.MILLISECONDS.sleep(200); + } + catch (InterruptedException e) { + // + } + throw new IOException(); + } + } +} diff --git a/server/src/test/java/io/druid/server/initialization/JettyTest.java b/server/src/test/java/io/druid/server/initialization/JettyTest.java index a214941af94..3e196d6d928 100644 --- a/server/src/test/java/io/druid/server/initialization/JettyTest.java +++ b/server/src/test/java/io/druid/server/initialization/JettyTest.java @@ -1,128 +1,50 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package io.druid.server.initialization; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; -import com.google.inject.Binder; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.servlet.GuiceFilter; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.guice.GuiceInjectors; -import io.druid.guice.Jerseys; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.LazySingleton; -import io.druid.guice.LifecycleModule; -import io.druid.guice.annotations.Self; -import io.druid.initialization.Initialization; -import io.druid.server.DruidNode; import org.apache.commons.io.IOUtils; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.Duration; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import javax.net.ssl.SSLContext; -import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.Charset; -import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -public class JettyTest +public class JettyTest extends BaseJettyTest { - private Lifecycle lifecycle; - private HttpClient client; - private int port = -1; - - public static void setProperties() - { - System.setProperty("druid.server.http.numThreads", "20"); - System.setProperty("druid.server.http.maxIdleTime", "PT1S"); - System.setProperty("druid.global.http.readTimeout", "PT1S"); - } - - @Before - public void setup() throws Exception - { - setProperties(); - Injector injector = Initialization.makeInjectorWithModules( - GuiceInjectors.makeStartupInjector(), ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - JsonConfigProvider.bindInstance( - binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null) - ); - binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class); - Jerseys.addResource(binder, SlowResource.class); - Jerseys.addResource(binder, ExceptionResource.class); - Jerseys.addResource(binder, DefaultResource.class); - LifecycleModule.register(binder, Server.class); - } - } - ) - ); - final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class)); - port = node.getPort(); - - lifecycle = injector.getInstance(Lifecycle.class); - lifecycle.start(); - ClientHolder holder = injector.getInstance(ClientHolder.class); - client = holder.getClient(); - } @Test @Ignore // this test will deadlock if it hits an issue, so ignored by default @@ -193,8 +115,14 @@ public class JettyTest final HttpURLConnection post = (HttpURLConnection) url.openConnection(); post.setRequestProperty("Accept-Encoding", "gzip"); post.setRequestMethod("POST"); - Assert.assertEquals("gzip", post.getContentEncoding()); + + final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection(); + Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding()); + + final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection(); + postNoGzip.setRequestMethod("POST"); + Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding()); } // Tests that threads are not stuck when partial chunk is not finalized @@ -251,113 +179,4 @@ public class JettyTest latch.await(5, TimeUnit.SECONDS); } - - @After - public void teardown() - { - lifecycle.stop(); - } - - public static class ClientHolder - { - HttpClient client; - - ClientHolder() - { - try { - this.client = HttpClientInit.createClient( - new HttpClientConfig(1, SSLContext.getDefault(), Duration.ZERO), - new Lifecycle() - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - public HttpClient getClient() - { - return client; - } - } - - public static class JettyServerInit extends BaseJettyServerInitializer - { - - @Override - public void initialize(Server server, Injector injector) - { - final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); - root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addFilter(defaultGzipFilterHolder(), "/*", null); - root.addFilter(GuiceFilter.class, "/*", null); - - final HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[]{root}); - server.setHandler(handlerList); - } - } - - @Path("/slow") - public static class SlowResource - { - - public static Random random = new Random(); - - @GET - @Path("/hello") - @Produces(MediaType.APPLICATION_JSON) - public Response hello() - { - try { - TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(2000)); - } - catch (InterruptedException e) { - // - } - return Response.ok("hello").build(); - } - } - - @Path("/default") - public static class DefaultResource - { - @GET - @Produces(MediaType.APPLICATION_JSON) - public Response get() - { - return Response.ok("hello").build(); - } - - @POST - @Produces(MediaType.APPLICATION_JSON) - public Response post() - { - return Response.ok("hello").build(); - } - - } - - @Path("/exception") - public static class ExceptionResource - { - @GET - @Path("/exception") - @Produces(MediaType.APPLICATION_JSON) - public Response exception( - @Context HttpServletResponse resp - ) throws IOException - { - final ServletOutputStream outputStream = resp.getOutputStream(); - outputStream.println("hello"); - outputStream.flush(); - try { - TimeUnit.MILLISECONDS.sleep(200); - } - catch (InterruptedException e) { - // - } - throw new IOException(); - } - } } diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index 2bef30b2f68..4ab986f7328 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -20,10 +20,12 @@ package io.druid.cli; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Provider; import com.google.inject.servlet.GuiceFilter; import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; +import io.druid.guice.http.DruidHttpClientConfig; import io.druid.server.AsyncQueryForwardingServlet; import io.druid.server.initialization.BaseJettyServerInitializer; import io.druid.server.log.RequestLogger; @@ -44,7 +46,8 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; - private final HttpClient httpClient; + private final Provider httpClientProvider; + private final DruidHttpClientConfig httpClientConfig; private final ServiceEmitter emitter; private final RequestLogger requestLogger; @@ -53,7 +56,8 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, - @Router HttpClient httpClient, + @Router Provider httpClientProvider, + DruidHttpClientConfig httpClientConfig, ServiceEmitter emitter, RequestLogger requestLogger ) @@ -61,7 +65,8 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; - this.httpClient = httpClient; + this.httpClientProvider = httpClientProvider; + this.httpClientConfig = httpClientConfig; this.emitter = emitter; this.requestLogger = requestLogger; } @@ -72,18 +77,19 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); - root.addServlet( - new ServletHolder( - new AsyncQueryForwardingServlet( - jsonMapper, - smileMapper, - hostFinder, - httpClient, - emitter, - requestLogger - ) - ), "/druid/v2/*" + + final AsyncQueryForwardingServlet asyncQueryForwardingServlet = new AsyncQueryForwardingServlet( + jsonMapper, + smileMapper, + hostFinder, + httpClientProvider, + httpClientConfig, + emitter, + requestLogger ); + asyncQueryForwardingServlet.setTimeout(httpClientConfig.getReadTimeout().getMillis()); + + root.addServlet(new ServletHolder(asyncQueryForwardingServlet), "/druid/v2/*"); root.addFilter(defaultAsyncGzipFilterHolder(), "/*", null); // Can't use '/*' here because of Guice conflicts with AsyncQueryForwardingServlet path root.addFilter(GuiceFilter.class, "/status/*", null);