mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Cleanup router servlet + tests for gzip proxying
- Uses method overrides instead of modified Jetty code, now that ProxyServlet provides enough method hooks for proper overrides. This means we may also benefit from any Jetty ProxyServlet fixes - Adds test for async proxy servlet to make sure gzip encoding is properly proxied. - Router now proxies POST requests for requests that are not Druid queries, by only treating /druid/v2/* endpoints as queries.
This commit is contained in:
parent
c21086b6a8
commit
17743b94f3
@ -124,11 +124,6 @@ public class JettyHttpClientModule implements Module
|
|||||||
@Override
|
@Override
|
||||||
public void start() throws Exception
|
public void start() throws Exception
|
||||||
{
|
{
|
||||||
httpClient.start();
|
|
||||||
|
|
||||||
// forwards raw bytes, don't decode gzip
|
|
||||||
// decoders are populated on start, so this has to be done after start() is called
|
|
||||||
httpClient.getContentDecoderFactories().clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -17,37 +17,38 @@
|
|||||||
|
|
||||||
package io.druid.server;
|
package io.druid.server;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||||
|
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.inject.Provider;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.guice.annotations.Json;
|
import io.druid.guice.annotations.Json;
|
||||||
import io.druid.guice.annotations.Smile;
|
import io.druid.guice.annotations.Smile;
|
||||||
|
import io.druid.guice.http.DruidHttpClientConfig;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryMetricUtil;
|
import io.druid.query.QueryMetricUtil;
|
||||||
import io.druid.server.log.RequestLogger;
|
import io.druid.server.log.RequestLogger;
|
||||||
import io.druid.server.router.QueryHostFinder;
|
import io.druid.server.router.QueryHostFinder;
|
||||||
import io.druid.server.router.Router;
|
import io.druid.server.router.Router;
|
||||||
import org.eclipse.jetty.client.HttpClient;
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
|
import org.eclipse.jetty.client.api.ContentProvider;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.api.Response;
|
import org.eclipse.jetty.client.api.Response;
|
||||||
import org.eclipse.jetty.client.api.Result;
|
import org.eclipse.jetty.client.api.Result;
|
||||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||||
import org.eclipse.jetty.http.HttpHeader;
|
|
||||||
import org.eclipse.jetty.http.HttpMethod;
|
import org.eclipse.jetty.http.HttpMethod;
|
||||||
import org.eclipse.jetty.http.HttpVersion;
|
|
||||||
import org.eclipse.jetty.proxy.AsyncProxyServlet;
|
import org.eclipse.jetty.proxy.AsyncProxyServlet;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
import javax.servlet.AsyncContext;
|
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Enumeration;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -60,6 +61,10 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||||||
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
|
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
|
||||||
private static final String APPLICATION_SMILE = "application/smile";
|
private static final String APPLICATION_SMILE = "application/smile";
|
||||||
|
|
||||||
|
private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host";
|
||||||
|
private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query";
|
||||||
|
private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper";
|
||||||
|
|
||||||
private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception)
|
private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception)
|
||||||
throws IOException
|
throws IOException
|
||||||
{
|
{
|
||||||
@ -79,7 +84,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final ObjectMapper smileMapper;
|
private final ObjectMapper smileMapper;
|
||||||
private final QueryHostFinder hostFinder;
|
private final QueryHostFinder hostFinder;
|
||||||
private final HttpClient httpClient;
|
private final Provider<HttpClient> httpClientProvider;
|
||||||
|
private final DruidHttpClientConfig httpClientConfig;
|
||||||
private final ServiceEmitter emitter;
|
private final ServiceEmitter emitter;
|
||||||
private final RequestLogger requestLogger;
|
private final RequestLogger requestLogger;
|
||||||
|
|
||||||
@ -87,7 +93,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||||||
@Json ObjectMapper jsonMapper,
|
@Json ObjectMapper jsonMapper,
|
||||||
@Smile ObjectMapper smileMapper,
|
@Smile ObjectMapper smileMapper,
|
||||||
QueryHostFinder hostFinder,
|
QueryHostFinder hostFinder,
|
||||||
@Router HttpClient httpClient,
|
@Router Provider<HttpClient> httpClientProvider,
|
||||||
|
DruidHttpClientConfig httpClientConfig,
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
RequestLogger requestLogger
|
RequestLogger requestLogger
|
||||||
)
|
)
|
||||||
@ -95,7 +102,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.smileMapper = smileMapper;
|
this.smileMapper = smileMapper;
|
||||||
this.hostFinder = hostFinder;
|
this.hostFinder = hostFinder;
|
||||||
this.httpClient = httpClient;
|
this.httpClientProvider = httpClientProvider;
|
||||||
|
this.httpClientConfig = httpClientConfig;
|
||||||
this.emitter = emitter;
|
this.emitter = emitter;
|
||||||
this.requestLogger = requestLogger;
|
this.requestLogger = requestLogger;
|
||||||
}
|
}
|
||||||
@ -105,23 +113,25 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||||||
{
|
{
|
||||||
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(request.getContentType()) || APPLICATION_SMILE.equals(request.getContentType());
|
final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(request.getContentType()) || APPLICATION_SMILE.equals(request.getContentType());
|
||||||
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||||
|
request.setAttribute(OBJECTMAPPER_ATTRIBUTE, objectMapper);
|
||||||
|
|
||||||
String host = hostFinder.getDefaultHost();
|
String host = hostFinder.getDefaultHost();
|
||||||
Query inputQuery = null;
|
request.setAttribute(HOST_ATTRIBUTE, host);
|
||||||
boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null;
|
|
||||||
boolean isQuery = request.getMethod().equals(HttpMethod.POST.asString());
|
boolean isQuery = request.getMethod().equals(HttpMethod.POST.asString()) &&
|
||||||
long startTime = System.currentTimeMillis();
|
request.getRequestURI().startsWith("/druid/v2");
|
||||||
|
|
||||||
// queries only exist for POST
|
// queries only exist for POST
|
||||||
if (isQuery) {
|
if (isQuery) {
|
||||||
try {
|
try {
|
||||||
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
|
Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
|
||||||
if (inputQuery != null) {
|
if (inputQuery != null) {
|
||||||
host = hostFinder.getHost(inputQuery);
|
request.setAttribute(HOST_ATTRIBUTE, hostFinder.getHost(inputQuery));
|
||||||
if (inputQuery.getId() == null) {
|
if (inputQuery.getId() == null) {
|
||||||
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
|
inputQuery = inputQuery.withId(UUID.randomUUID().toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
log.warn(e, "Exception parsing query");
|
log.warn(e, "Exception parsing query");
|
||||||
@ -149,78 +159,67 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
URI rewrittenURI = rewriteURI(host, request);
|
super.service(request, response);
|
||||||
if (rewrittenURI == null) {
|
}
|
||||||
onRewriteFailed(request, response);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Request proxyRequest = getHttpClient().newRequest(rewrittenURI)
|
@Override
|
||||||
.method(request.getMethod())
|
protected void customizeProxyRequest(Request proxyRequest, HttpServletRequest request)
|
||||||
.version(HttpVersion.fromString(request.getProtocol()));
|
{
|
||||||
|
proxyRequest.timeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
proxyRequest.idleTimeout(httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
// Copy headers
|
final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE);
|
||||||
for (Enumeration<String> headerNames = request.getHeaderNames(); headerNames.hasMoreElements(); ) {
|
if (query != null) {
|
||||||
String headerName = headerNames.nextElement();
|
final ObjectMapper objectMapper = (ObjectMapper) request.getAttribute(OBJECTMAPPER_ATTRIBUTE);
|
||||||
|
try {
|
||||||
if (HttpHeader.TRANSFER_ENCODING.is(headerName)) {
|
proxyRequest.content(new BytesContentProvider(objectMapper.writeValueAsBytes(query)));
|
||||||
hasContent = true;
|
} catch(JsonProcessingException e) {
|
||||||
}
|
Throwables.propagate(e);
|
||||||
|
|
||||||
for (Enumeration<String> headerValues = request.getHeaders(headerName); headerValues.hasMoreElements(); ) {
|
|
||||||
String headerValue = headerValues.nextElement();
|
|
||||||
if (headerValue != null) {
|
|
||||||
proxyRequest.header(headerName, headerValue);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add proxy headers
|
@Override
|
||||||
addViaHeader(proxyRequest);
|
protected Response.Listener newProxyResponseListener(
|
||||||
|
HttpServletRequest request, HttpServletResponse response
|
||||||
addXForwardedHeaders(proxyRequest, request);
|
)
|
||||||
|
{
|
||||||
final AsyncContext asyncContext = request.startAsync();
|
final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE);
|
||||||
// We do not timeout the continuation, but the proxy request
|
if (query != null) {
|
||||||
asyncContext.setTimeout(0);
|
return newMetricsEmittingProxyResponseListener(request, response, query, System.currentTimeMillis());
|
||||||
proxyRequest.timeout(
|
|
||||||
getTimeout(), TimeUnit.MILLISECONDS
|
|
||||||
);
|
|
||||||
|
|
||||||
if (hasContent) {
|
|
||||||
if (inputQuery != null) {
|
|
||||||
proxyRequest.content(new BytesContentProvider(jsonMapper.writeValueAsBytes(inputQuery)));
|
|
||||||
} else {
|
|
||||||
proxyRequest.content(proxyRequestContent(proxyRequest, request));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
customizeProxyRequest(proxyRequest, request);
|
|
||||||
|
|
||||||
if (isQuery) {
|
|
||||||
proxyRequest.send(newMetricsEmittingProxyResponseListener(request, response, inputQuery, startTime));
|
|
||||||
} else {
|
} else {
|
||||||
proxyRequest.send(newProxyResponseListener(request, response));
|
return super.newProxyResponseListener(request, response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected URI rewriteURI(HttpServletRequest request)
|
||||||
|
{
|
||||||
|
final String host = (String) request.getAttribute(HOST_ATTRIBUTE);
|
||||||
|
final StringBuilder uri = new StringBuilder("http://");
|
||||||
|
|
||||||
|
uri.append(host);
|
||||||
|
uri.append(request.getRequestURI());
|
||||||
|
final String queryString = request.getQueryString();
|
||||||
|
if (queryString != null) {
|
||||||
|
uri.append("?").append(queryString);
|
||||||
|
}
|
||||||
|
return URI.create(uri.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected HttpClient newHttpClient()
|
||||||
|
{
|
||||||
|
return httpClientProvider.get();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected HttpClient createHttpClient() throws ServletException
|
protected HttpClient createHttpClient() throws ServletException
|
||||||
{
|
{
|
||||||
return httpClient;
|
HttpClient client = super.createHttpClient();
|
||||||
}
|
// override timeout set in ProxyServlet.createHttpClient
|
||||||
|
setTimeout(httpClientConfig.getReadTimeout().getMillis());
|
||||||
private URI rewriteURI(final String host, final HttpServletRequest req)
|
return client;
|
||||||
{
|
|
||||||
final StringBuilder uri = new StringBuilder("http://");
|
|
||||||
|
|
||||||
uri.append(host);
|
|
||||||
uri.append(req.getRequestURI());
|
|
||||||
final String queryString = req.getQueryString();
|
|
||||||
if (queryString != null) {
|
|
||||||
uri.append("?").append(queryString);
|
|
||||||
}
|
|
||||||
return URI.create(uri.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Response.Listener newMetricsEmittingProxyResponseListener(
|
private Response.Listener newMetricsEmittingProxyResponseListener(
|
||||||
|
@ -0,0 +1,195 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.servlet.GuiceFilter;
|
||||||
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
|
import io.druid.guice.GuiceInjectors;
|
||||||
|
import io.druid.guice.Jerseys;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.guice.annotations.Smile;
|
||||||
|
import io.druid.guice.http.DruidHttpClientConfig;
|
||||||
|
import io.druid.initialization.Initialization;
|
||||||
|
import io.druid.query.Query;
|
||||||
|
import io.druid.server.initialization.BaseJettyServerInitializer;
|
||||||
|
import io.druid.server.initialization.JettyServerInitializer;
|
||||||
|
import io.druid.server.initialization.BaseJettyTest;
|
||||||
|
import io.druid.server.log.RequestLogger;
|
||||||
|
import io.druid.server.metrics.NoopServiceEmitter;
|
||||||
|
import io.druid.server.router.QueryHostFinder;
|
||||||
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
public class AsyncQueryForwardingServletTest extends BaseJettyTest
|
||||||
|
{
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception
|
||||||
|
{
|
||||||
|
setProperties();
|
||||||
|
Injector injector = setupInjector();
|
||||||
|
final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class));
|
||||||
|
port = node.getPort();
|
||||||
|
|
||||||
|
lifecycle = injector.getInstance(Lifecycle.class);
|
||||||
|
lifecycle.start();
|
||||||
|
ClientHolder holder = injector.getInstance(ClientHolder.class);
|
||||||
|
client = holder.getClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Injector setupInjector()
|
||||||
|
{
|
||||||
|
return Initialization.makeInjectorWithModules(
|
||||||
|
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bindInstance(
|
||||||
|
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
|
||||||
|
);
|
||||||
|
binder.bind(JettyServerInitializer.class).to(ProxyJettyServerInit.class).in(LazySingleton.class);
|
||||||
|
Jerseys.addResource(binder, SlowResource.class);
|
||||||
|
Jerseys.addResource(binder, ExceptionResource.class);
|
||||||
|
Jerseys.addResource(binder, DefaultResource.class);
|
||||||
|
LifecycleModule.register(binder, Server.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProxyGzipCompression() throws Exception
|
||||||
|
{
|
||||||
|
final URL url = new URL("http://localhost:" + port + "/proxy/default");
|
||||||
|
|
||||||
|
final HttpURLConnection get = (HttpURLConnection) url.openConnection();
|
||||||
|
get.setRequestProperty("Accept-Encoding", "gzip");
|
||||||
|
Assert.assertEquals("gzip", get.getContentEncoding());
|
||||||
|
|
||||||
|
final HttpURLConnection post = (HttpURLConnection) url.openConnection();
|
||||||
|
post.setRequestProperty("Accept-Encoding", "gzip");
|
||||||
|
post.setRequestMethod("POST");
|
||||||
|
Assert.assertEquals("gzip", post.getContentEncoding());
|
||||||
|
|
||||||
|
final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection();
|
||||||
|
Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding());
|
||||||
|
|
||||||
|
final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection();
|
||||||
|
postNoGzip.setRequestMethod("POST");
|
||||||
|
Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ProxyJettyServerInit extends BaseJettyServerInitializer
|
||||||
|
{
|
||||||
|
|
||||||
|
private final DruidNode node;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ProxyJettyServerInit(@Self DruidNode node)
|
||||||
|
{
|
||||||
|
this.node = node;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Server server, Injector injector)
|
||||||
|
{
|
||||||
|
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||||
|
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||||
|
|
||||||
|
final QueryHostFinder hostFinder = new QueryHostFinder(null)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public String getHost(Query query)
|
||||||
|
{
|
||||||
|
return "localhost:" + node.getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDefaultHost()
|
||||||
|
{
|
||||||
|
return "localhost:" + node.getPort();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
root.addServlet(
|
||||||
|
new ServletHolder(
|
||||||
|
new AsyncQueryForwardingServlet(
|
||||||
|
injector.getInstance(ObjectMapper.class),
|
||||||
|
injector.getInstance(Key.get(ObjectMapper.class, Smile.class)),
|
||||||
|
hostFinder,
|
||||||
|
injector.getProvider(org.eclipse.jetty.client.HttpClient.class),
|
||||||
|
injector.getInstance(DruidHttpClientConfig.class),
|
||||||
|
new NoopServiceEmitter(),
|
||||||
|
new RequestLogger()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void log(RequestLogLine requestLogLine) throws IOException
|
||||||
|
{
|
||||||
|
// noop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
) {
|
||||||
|
@Override
|
||||||
|
protected URI rewriteURI(HttpServletRequest request)
|
||||||
|
{
|
||||||
|
URI uri = super.rewriteURI(request);
|
||||||
|
return URI.create(uri.toString().replace("/proxy", ""));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
), "/proxy/*"
|
||||||
|
);
|
||||||
|
|
||||||
|
root.addFilter(defaultAsyncGzipFilterHolder(), "/*", null);
|
||||||
|
root.addFilter(GuiceFilter.class, "/slow/*", null);
|
||||||
|
root.addFilter(GuiceFilter.class, "/default/*", null);
|
||||||
|
root.addFilter(GuiceFilter.class, "/exception/*", null);
|
||||||
|
|
||||||
|
final HandlerList handlerList = new HandlerList();
|
||||||
|
handlerList.setHandlers(new Handler[]{root});
|
||||||
|
server.setHandler(handlerList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,222 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.servlet.GuiceFilter;
|
||||||
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
|
import com.metamx.http.client.HttpClient;
|
||||||
|
import com.metamx.http.client.HttpClientConfig;
|
||||||
|
import com.metamx.http.client.HttpClientInit;
|
||||||
|
import io.druid.guice.GuiceInjectors;
|
||||||
|
import io.druid.guice.Jerseys;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.initialization.Initialization;
|
||||||
|
import io.druid.server.DruidNode;
|
||||||
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||||
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||||
|
import org.eclipse.jetty.servlet.ServletHolder;
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import javax.servlet.ServletOutputStream;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.GET;
|
||||||
|
import javax.ws.rs.POST;
|
||||||
|
import javax.ws.rs.Path;
|
||||||
|
import javax.ws.rs.Produces;
|
||||||
|
import javax.ws.rs.core.Context;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class BaseJettyTest
|
||||||
|
{
|
||||||
|
protected Lifecycle lifecycle;
|
||||||
|
protected HttpClient client;
|
||||||
|
protected int port = -1;
|
||||||
|
|
||||||
|
public static void setProperties()
|
||||||
|
{
|
||||||
|
System.setProperty("druid.server.http.numThreads", "20");
|
||||||
|
System.setProperty("druid.server.http.maxIdleTime", "PT1S");
|
||||||
|
System.setProperty("druid.global.http.readTimeout", "PT1S");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception
|
||||||
|
{
|
||||||
|
setProperties();
|
||||||
|
Injector injector = setupInjector();
|
||||||
|
final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class));
|
||||||
|
port = node.getPort();
|
||||||
|
lifecycle = injector.getInstance(Lifecycle.class);
|
||||||
|
lifecycle.start();
|
||||||
|
ClientHolder holder = injector.getInstance(ClientHolder.class);
|
||||||
|
client = holder.getClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Injector setupInjector()
|
||||||
|
{
|
||||||
|
return Initialization.makeInjectorWithModules(
|
||||||
|
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bindInstance(
|
||||||
|
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
|
||||||
|
);
|
||||||
|
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
|
||||||
|
Jerseys.addResource(binder, SlowResource.class);
|
||||||
|
Jerseys.addResource(binder, ExceptionResource.class);
|
||||||
|
Jerseys.addResource(binder, DefaultResource.class);
|
||||||
|
LifecycleModule.register(binder, Server.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown()
|
||||||
|
{
|
||||||
|
lifecycle.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ClientHolder
|
||||||
|
{
|
||||||
|
HttpClient client;
|
||||||
|
|
||||||
|
ClientHolder()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
this.client = HttpClientInit.createClient(
|
||||||
|
new HttpClientConfig(1, SSLContext.getDefault(), Duration.ZERO),
|
||||||
|
new Lifecycle()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpClient getClient()
|
||||||
|
{
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class JettyServerInit extends BaseJettyServerInitializer
|
||||||
|
{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Server server, Injector injector)
|
||||||
|
{
|
||||||
|
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||||
|
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||||
|
root.addFilter(defaultGzipFilterHolder(), "/*", null);
|
||||||
|
root.addFilter(GuiceFilter.class, "/*", null);
|
||||||
|
|
||||||
|
final HandlerList handlerList = new HandlerList();
|
||||||
|
handlerList.setHandlers(new Handler[]{root});
|
||||||
|
server.setHandler(handlerList);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Path("/slow")
|
||||||
|
public static class SlowResource
|
||||||
|
{
|
||||||
|
|
||||||
|
public static Random random = new Random();
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Path("/hello")
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Response hello()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(2000));
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
return Response.ok("hello").build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Path("/default")
|
||||||
|
public static class DefaultResource
|
||||||
|
{
|
||||||
|
@GET
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Response get()
|
||||||
|
{
|
||||||
|
return Response.ok("hello").build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@POST
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Response post()
|
||||||
|
{
|
||||||
|
return Response.ok("hello").build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Path("/exception")
|
||||||
|
public static class ExceptionResource
|
||||||
|
{
|
||||||
|
@GET
|
||||||
|
@Path("/exception")
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
public Response exception(
|
||||||
|
@Context HttpServletResponse resp
|
||||||
|
) throws IOException
|
||||||
|
{
|
||||||
|
final ServletOutputStream outputStream = resp.getOutputStream();
|
||||||
|
outputStream.println("hello");
|
||||||
|
outputStream.flush();
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(200);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,128 +1,50 @@
|
|||||||
/*
|
/*
|
||||||
* Druid - a distributed column store.
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
* Copyright 2012 - 2015 Metamarkets Group Inc.
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* Unless required by applicable law or agreed to in writing,
|
||||||
*
|
* software distributed under the License is distributed on an
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* KIND, either express or implied. See the License for the
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* specific language governing permissions and limitations
|
||||||
* See the License for the specific language governing permissions and
|
* under the License.
|
||||||
* limitations under the License.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io.druid.server.initialization;
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.inject.Binder;
|
|
||||||
import com.google.inject.Injector;
|
|
||||||
import com.google.inject.Key;
|
|
||||||
import com.google.inject.Module;
|
|
||||||
import com.google.inject.servlet.GuiceFilter;
|
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
|
||||||
import com.metamx.http.client.HttpClient;
|
|
||||||
import com.metamx.http.client.HttpClientConfig;
|
|
||||||
import com.metamx.http.client.HttpClientInit;
|
|
||||||
import com.metamx.http.client.Request;
|
import com.metamx.http.client.Request;
|
||||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
import com.metamx.http.client.response.InputStreamResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHandler;
|
import com.metamx.http.client.response.StatusResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHolder;
|
import com.metamx.http.client.response.StatusResponseHolder;
|
||||||
import io.druid.guice.GuiceInjectors;
|
|
||||||
import io.druid.guice.Jerseys;
|
|
||||||
import io.druid.guice.JsonConfigProvider;
|
|
||||||
import io.druid.guice.LazySingleton;
|
|
||||||
import io.druid.guice.LifecycleModule;
|
|
||||||
import io.druid.guice.annotations.Self;
|
|
||||||
import io.druid.initialization.Initialization;
|
|
||||||
import io.druid.server.DruidNode;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.eclipse.jetty.server.Handler;
|
|
||||||
import org.eclipse.jetty.server.Server;
|
|
||||||
import org.eclipse.jetty.server.handler.HandlerList;
|
|
||||||
import org.eclipse.jetty.servlet.DefaultServlet;
|
|
||||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
|
||||||
import org.eclipse.jetty.servlet.ServletHolder;
|
|
||||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
import org.joda.time.Duration;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import javax.servlet.ServletOutputStream;
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
|
||||||
import javax.ws.rs.GET;
|
|
||||||
import javax.ws.rs.POST;
|
|
||||||
import javax.ws.rs.Path;
|
|
||||||
import javax.ws.rs.Produces;
|
|
||||||
import javax.ws.rs.core.Context;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
public class JettyTest
|
public class JettyTest extends BaseJettyTest
|
||||||
{
|
{
|
||||||
private Lifecycle lifecycle;
|
|
||||||
private HttpClient client;
|
|
||||||
private int port = -1;
|
|
||||||
|
|
||||||
public static void setProperties()
|
|
||||||
{
|
|
||||||
System.setProperty("druid.server.http.numThreads", "20");
|
|
||||||
System.setProperty("druid.server.http.maxIdleTime", "PT1S");
|
|
||||||
System.setProperty("druid.global.http.readTimeout", "PT1S");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setup() throws Exception
|
|
||||||
{
|
|
||||||
setProperties();
|
|
||||||
Injector injector = Initialization.makeInjectorWithModules(
|
|
||||||
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
|
||||||
new Module()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void configure(Binder binder)
|
|
||||||
{
|
|
||||||
JsonConfigProvider.bindInstance(
|
|
||||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
|
|
||||||
);
|
|
||||||
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
|
|
||||||
Jerseys.addResource(binder, SlowResource.class);
|
|
||||||
Jerseys.addResource(binder, ExceptionResource.class);
|
|
||||||
Jerseys.addResource(binder, DefaultResource.class);
|
|
||||||
LifecycleModule.register(binder, Server.class);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
final DruidNode node = injector.getInstance(Key.get(DruidNode.class, Self.class));
|
|
||||||
port = node.getPort();
|
|
||||||
|
|
||||||
lifecycle = injector.getInstance(Lifecycle.class);
|
|
||||||
lifecycle.start();
|
|
||||||
ClientHolder holder = injector.getInstance(ClientHolder.class);
|
|
||||||
client = holder.getClient();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore // this test will deadlock if it hits an issue, so ignored by default
|
@Ignore // this test will deadlock if it hits an issue, so ignored by default
|
||||||
@ -193,8 +115,14 @@ public class JettyTest
|
|||||||
final HttpURLConnection post = (HttpURLConnection) url.openConnection();
|
final HttpURLConnection post = (HttpURLConnection) url.openConnection();
|
||||||
post.setRequestProperty("Accept-Encoding", "gzip");
|
post.setRequestProperty("Accept-Encoding", "gzip");
|
||||||
post.setRequestMethod("POST");
|
post.setRequestMethod("POST");
|
||||||
|
|
||||||
Assert.assertEquals("gzip", post.getContentEncoding());
|
Assert.assertEquals("gzip", post.getContentEncoding());
|
||||||
|
|
||||||
|
final HttpURLConnection getNoGzip = (HttpURLConnection) url.openConnection();
|
||||||
|
Assert.assertNotEquals("gzip", getNoGzip.getContentEncoding());
|
||||||
|
|
||||||
|
final HttpURLConnection postNoGzip = (HttpURLConnection) url.openConnection();
|
||||||
|
postNoGzip.setRequestMethod("POST");
|
||||||
|
Assert.assertNotEquals("gzip", postNoGzip.getContentEncoding());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that threads are not stuck when partial chunk is not finalized
|
// Tests that threads are not stuck when partial chunk is not finalized
|
||||||
@ -251,113 +179,4 @@ public class JettyTest
|
|||||||
|
|
||||||
latch.await(5, TimeUnit.SECONDS);
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
|
||||||
public void teardown()
|
|
||||||
{
|
|
||||||
lifecycle.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ClientHolder
|
|
||||||
{
|
|
||||||
HttpClient client;
|
|
||||||
|
|
||||||
ClientHolder()
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
this.client = HttpClientInit.createClient(
|
|
||||||
new HttpClientConfig(1, SSLContext.getDefault(), Duration.ZERO),
|
|
||||||
new Lifecycle()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public HttpClient getClient()
|
|
||||||
{
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class JettyServerInit extends BaseJettyServerInitializer
|
|
||||||
{
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(Server server, Injector injector)
|
|
||||||
{
|
|
||||||
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
|
||||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
|
||||||
root.addFilter(defaultGzipFilterHolder(), "/*", null);
|
|
||||||
root.addFilter(GuiceFilter.class, "/*", null);
|
|
||||||
|
|
||||||
final HandlerList handlerList = new HandlerList();
|
|
||||||
handlerList.setHandlers(new Handler[]{root});
|
|
||||||
server.setHandler(handlerList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Path("/slow")
|
|
||||||
public static class SlowResource
|
|
||||||
{
|
|
||||||
|
|
||||||
public static Random random = new Random();
|
|
||||||
|
|
||||||
@GET
|
|
||||||
@Path("/hello")
|
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
|
||||||
public Response hello()
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(2000));
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
//
|
|
||||||
}
|
|
||||||
return Response.ok("hello").build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Path("/default")
|
|
||||||
public static class DefaultResource
|
|
||||||
{
|
|
||||||
@GET
|
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
|
||||||
public Response get()
|
|
||||||
{
|
|
||||||
return Response.ok("hello").build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@POST
|
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
|
||||||
public Response post()
|
|
||||||
{
|
|
||||||
return Response.ok("hello").build();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Path("/exception")
|
|
||||||
public static class ExceptionResource
|
|
||||||
{
|
|
||||||
@GET
|
|
||||||
@Path("/exception")
|
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
|
||||||
public Response exception(
|
|
||||||
@Context HttpServletResponse resp
|
|
||||||
) throws IOException
|
|
||||||
{
|
|
||||||
final ServletOutputStream outputStream = resp.getOutputStream();
|
|
||||||
outputStream.println("hello");
|
|
||||||
outputStream.flush();
|
|
||||||
try {
|
|
||||||
TimeUnit.MILLISECONDS.sleep(200);
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
//
|
|
||||||
}
|
|
||||||
throw new IOException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -20,10 +20,12 @@ package io.druid.cli;
|
|||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Provider;
|
||||||
import com.google.inject.servlet.GuiceFilter;
|
import com.google.inject.servlet.GuiceFilter;
|
||||||
import com.metamx.emitter.service.ServiceEmitter;
|
import com.metamx.emitter.service.ServiceEmitter;
|
||||||
import io.druid.guice.annotations.Json;
|
import io.druid.guice.annotations.Json;
|
||||||
import io.druid.guice.annotations.Smile;
|
import io.druid.guice.annotations.Smile;
|
||||||
|
import io.druid.guice.http.DruidHttpClientConfig;
|
||||||
import io.druid.server.AsyncQueryForwardingServlet;
|
import io.druid.server.AsyncQueryForwardingServlet;
|
||||||
import io.druid.server.initialization.BaseJettyServerInitializer;
|
import io.druid.server.initialization.BaseJettyServerInitializer;
|
||||||
import io.druid.server.log.RequestLogger;
|
import io.druid.server.log.RequestLogger;
|
||||||
@ -44,7 +46,8 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer
|
|||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final ObjectMapper smileMapper;
|
private final ObjectMapper smileMapper;
|
||||||
private final QueryHostFinder hostFinder;
|
private final QueryHostFinder hostFinder;
|
||||||
private final HttpClient httpClient;
|
private final Provider<HttpClient> httpClientProvider;
|
||||||
|
private final DruidHttpClientConfig httpClientConfig;
|
||||||
private final ServiceEmitter emitter;
|
private final ServiceEmitter emitter;
|
||||||
private final RequestLogger requestLogger;
|
private final RequestLogger requestLogger;
|
||||||
|
|
||||||
@ -53,7 +56,8 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer
|
|||||||
@Json ObjectMapper jsonMapper,
|
@Json ObjectMapper jsonMapper,
|
||||||
@Smile ObjectMapper smileMapper,
|
@Smile ObjectMapper smileMapper,
|
||||||
QueryHostFinder hostFinder,
|
QueryHostFinder hostFinder,
|
||||||
@Router HttpClient httpClient,
|
@Router Provider<HttpClient> httpClientProvider,
|
||||||
|
DruidHttpClientConfig httpClientConfig,
|
||||||
ServiceEmitter emitter,
|
ServiceEmitter emitter,
|
||||||
RequestLogger requestLogger
|
RequestLogger requestLogger
|
||||||
)
|
)
|
||||||
@ -61,7 +65,8 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer
|
|||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
this.smileMapper = smileMapper;
|
this.smileMapper = smileMapper;
|
||||||
this.hostFinder = hostFinder;
|
this.hostFinder = hostFinder;
|
||||||
this.httpClient = httpClient;
|
this.httpClientProvider = httpClientProvider;
|
||||||
|
this.httpClientConfig = httpClientConfig;
|
||||||
this.emitter = emitter;
|
this.emitter = emitter;
|
||||||
this.requestLogger = requestLogger;
|
this.requestLogger = requestLogger;
|
||||||
}
|
}
|
||||||
@ -72,18 +77,19 @@ public class RouterJettyServerInitializer extends BaseJettyServerInitializer
|
|||||||
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||||
|
|
||||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||||
root.addServlet(
|
|
||||||
new ServletHolder(
|
final AsyncQueryForwardingServlet asyncQueryForwardingServlet = new AsyncQueryForwardingServlet(
|
||||||
new AsyncQueryForwardingServlet(
|
jsonMapper,
|
||||||
jsonMapper,
|
smileMapper,
|
||||||
smileMapper,
|
hostFinder,
|
||||||
hostFinder,
|
httpClientProvider,
|
||||||
httpClient,
|
httpClientConfig,
|
||||||
emitter,
|
emitter,
|
||||||
requestLogger
|
requestLogger
|
||||||
)
|
|
||||||
), "/druid/v2/*"
|
|
||||||
);
|
);
|
||||||
|
asyncQueryForwardingServlet.setTimeout(httpClientConfig.getReadTimeout().getMillis());
|
||||||
|
|
||||||
|
root.addServlet(new ServletHolder(asyncQueryForwardingServlet), "/druid/v2/*");
|
||||||
root.addFilter(defaultAsyncGzipFilterHolder(), "/*", null);
|
root.addFilter(defaultAsyncGzipFilterHolder(), "/*", null);
|
||||||
// Can't use '/*' here because of Guice conflicts with AsyncQueryForwardingServlet path
|
// Can't use '/*' here because of Guice conflicts with AsyncQueryForwardingServlet path
|
||||||
root.addFilter(GuiceFilter.class, "/status/*", null);
|
root.addFilter(GuiceFilter.class, "/status/*", null);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user