mirror of https://github.com/apache/druid.git
Merge pull request #651 from metamx/rewrite-async
rewrite router async logic
This commit is contained in:
commit
d21e377a20
19
pom.xml
19
pom.xml
|
@ -18,7 +18,8 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
|
@ -282,9 +283,9 @@
|
|||
<version>1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.glassfish</groupId>
|
||||
<artifactId>javax.el</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<groupId>org.glassfish</groupId>
|
||||
<artifactId>javax.el</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.jamesmurty.utils</groupId>
|
||||
|
@ -336,6 +337,16 @@
|
|||
<artifactId>jetty-servlets</artifactId>
|
||||
<version>9.2.2.v20140723</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-client</artifactId>
|
||||
<version>9.2.2.v20140723</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-proxy</artifactId>
|
||||
<version>9.2.2.v20140723</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
<artifactId>joda-time</artifactId>
|
||||
|
|
|
@ -19,10 +19,14 @@
|
|||
|
||||
package io.druid.query;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DataSourceUtil
|
||||
{
|
||||
public static final Joiner COMMA_JOIN = Joiner.on(",");
|
||||
|
||||
public static String getMetricName(DataSource dataSource)
|
||||
{
|
||||
final List<String> names = dataSource.getNames();
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
|
@ -101,6 +102,14 @@
|
|||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-proxy</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
|
|
|
@ -1,147 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.HttpResponseHandler;
|
||||
import io.druid.guice.annotations.Client;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.server.QueryResource;
|
||||
import io.druid.server.router.Router;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RoutingDruidClient<IntermediateType, FinalType>
|
||||
{
|
||||
private static final Logger log = new Logger(RoutingDruidClient.class);
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final HttpClient httpClient;
|
||||
|
||||
private final AtomicInteger openConnections;
|
||||
private final boolean isSmile;
|
||||
|
||||
@Inject
|
||||
public RoutingDruidClient(
|
||||
ObjectMapper objectMapper,
|
||||
@Router HttpClient httpClient
|
||||
)
|
||||
{
|
||||
this.objectMapper = objectMapper;
|
||||
this.httpClient = httpClient;
|
||||
|
||||
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
|
||||
this.openConnections = new AtomicInteger();
|
||||
}
|
||||
|
||||
public int getNumOpenConnections()
|
||||
{
|
||||
return openConnections.get();
|
||||
}
|
||||
|
||||
public ListenableFuture<FinalType> postQuery(
|
||||
URI uri,
|
||||
Query query,
|
||||
HttpResponseHandler<IntermediateType, FinalType> responseHandler
|
||||
)
|
||||
{
|
||||
final ListenableFuture<FinalType> future;
|
||||
|
||||
try {
|
||||
log.debug("Querying url[%s]", uri);
|
||||
future = httpClient
|
||||
.post(uri.toURL())
|
||||
.setContent(objectMapper.writeValueAsBytes(query))
|
||||
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? QueryResource.APPLICATION_SMILE : QueryResource.APPLICATION_JSON)
|
||||
.go(responseHandler);
|
||||
|
||||
openConnections.getAndIncrement();
|
||||
|
||||
Futures.addCallback(
|
||||
future,
|
||||
new FutureCallback<FinalType>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(FinalType result)
|
||||
{
|
||||
openConnections.getAndDecrement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
openConnections.getAndDecrement();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
public ListenableFuture<FinalType> get(
|
||||
URI uri,
|
||||
HttpResponseHandler<IntermediateType, FinalType> responseHandler
|
||||
)
|
||||
{
|
||||
try {
|
||||
return httpClient
|
||||
.get(uri.toURL())
|
||||
.go(responseHandler);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public ListenableFuture<FinalType> delete(
|
||||
URI uri,
|
||||
HttpResponseHandler<IntermediateType, FinalType> responseHandler
|
||||
)
|
||||
{
|
||||
try {
|
||||
return httpClient
|
||||
.delete(uri.toURL())
|
||||
.go(responseHandler);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice.http;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.inject.Binding;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.lang.annotation.Annotation;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class AbstractHttpClientProvider<HttpClientType> implements Provider<HttpClientType>
|
||||
{
|
||||
private final Key<Supplier<DruidHttpClientConfig>> configKey;
|
||||
private final Key<SSLContext> sslContextKey;
|
||||
|
||||
private Provider<Supplier<DruidHttpClientConfig>> configProvider;
|
||||
private Provider<Lifecycle> lifecycleProvider;
|
||||
private Binding<SSLContext> sslContextBinding;
|
||||
|
||||
public AbstractHttpClientProvider()
|
||||
{
|
||||
configKey = Key.get(
|
||||
new TypeLiteral<Supplier<DruidHttpClientConfig>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
sslContextKey = Key.get(SSLContext.class);
|
||||
}
|
||||
|
||||
public AbstractHttpClientProvider(Annotation annotation)
|
||||
{
|
||||
configKey = Key.get(
|
||||
new TypeLiteral<Supplier<DruidHttpClientConfig>>()
|
||||
{
|
||||
}, annotation
|
||||
);
|
||||
sslContextKey = Key.get(SSLContext.class, annotation);
|
||||
}
|
||||
|
||||
public AbstractHttpClientProvider(Class<? extends Annotation> annotation)
|
||||
{
|
||||
configKey = Key.get(
|
||||
new TypeLiteral<Supplier<DruidHttpClientConfig>>()
|
||||
{
|
||||
}, annotation
|
||||
);
|
||||
sslContextKey = Key.get(SSLContext.class, annotation);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public void configure(Injector injector)
|
||||
{
|
||||
configProvider = injector.getProvider(configKey);
|
||||
sslContextBinding = injector.getExistingBinding(sslContextKey);
|
||||
lifecycleProvider = injector.getProvider(Lifecycle.class);
|
||||
}
|
||||
|
||||
public Key<Supplier<DruidHttpClientConfig>> getConfigKey()
|
||||
{
|
||||
return configKey;
|
||||
}
|
||||
|
||||
public Key<SSLContext> getSslContextKey()
|
||||
{
|
||||
return sslContextKey;
|
||||
}
|
||||
|
||||
public Provider<Supplier<DruidHttpClientConfig>> getConfigProvider()
|
||||
{
|
||||
return configProvider;
|
||||
}
|
||||
|
||||
public Provider<Lifecycle> getLifecycleProvider()
|
||||
{
|
||||
return lifecycleProvider;
|
||||
}
|
||||
|
||||
public Binding<SSLContext> getSslContextBinding()
|
||||
{
|
||||
return sslContextBinding;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice.http;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Period;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
||||
public class DruidHttpClientConfig
|
||||
{
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int numConnections = 5;
|
||||
|
||||
@JsonProperty
|
||||
private Period readTimeout = new Period("PT15M");
|
||||
|
||||
public int getNumConnections()
|
||||
{
|
||||
return numConnections;
|
||||
}
|
||||
|
||||
public Duration getReadTimeout()
|
||||
{
|
||||
return readTimeout == null ? null : readTimeout.toStandardDuration();
|
||||
}
|
||||
}
|
|
@ -17,28 +17,17 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice;
|
||||
package io.druid.guice.http;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Binding;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Period;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.validation.constraints.Min;
|
||||
import java.lang.annotation.Annotation;
|
||||
|
||||
/**
|
||||
|
@ -80,15 +69,13 @@ public class HttpClientModule implements Module
|
|||
.annotatedWith(annotation)
|
||||
.toProvider(new HttpClientProvider(annotation))
|
||||
.in(LazySingleton.class);
|
||||
}
|
||||
else if (annotationClazz != null) {
|
||||
} else if (annotationClazz != null) {
|
||||
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class, annotationClazz);
|
||||
binder.bind(HttpClient.class)
|
||||
.annotatedWith(annotationClazz)
|
||||
.toProvider(new HttpClientProvider(annotationClazz))
|
||||
.in(LazySingleton.class);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class);
|
||||
binder.bind(HttpClient.class)
|
||||
.toProvider(new HttpClientProvider())
|
||||
|
@ -96,76 +83,37 @@ public class HttpClientModule implements Module
|
|||
}
|
||||
}
|
||||
|
||||
public static class DruidHttpClientConfig
|
||||
public static class HttpClientProvider extends AbstractHttpClientProvider<HttpClient>
|
||||
{
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private int numConnections = 5;
|
||||
|
||||
@JsonProperty
|
||||
private Period readTimeout = new Period("PT15M");
|
||||
|
||||
public int getNumConnections()
|
||||
{
|
||||
return numConnections;
|
||||
}
|
||||
|
||||
public Duration getReadTimeout()
|
||||
{
|
||||
return readTimeout == null ? null : readTimeout.toStandardDuration();
|
||||
}
|
||||
}
|
||||
|
||||
public static class HttpClientProvider implements Provider<HttpClient>
|
||||
{
|
||||
private final Key<Supplier<DruidHttpClientConfig>> configKey;
|
||||
private final Key<SSLContext> sslContextKey;
|
||||
|
||||
private Provider<Supplier<DruidHttpClientConfig>> configProvider;
|
||||
private Provider<Lifecycle> lifecycleProvider;
|
||||
private Binding<SSLContext> sslContextBinding;
|
||||
|
||||
public HttpClientProvider()
|
||||
{
|
||||
configKey = Key.get(new TypeLiteral<Supplier<DruidHttpClientConfig>>(){});
|
||||
sslContextKey = Key.get(SSLContext.class);
|
||||
}
|
||||
|
||||
public HttpClientProvider(Annotation annotation)
|
||||
{
|
||||
configKey = Key.get(new TypeLiteral<Supplier<DruidHttpClientConfig>>(){}, annotation);
|
||||
sslContextKey = Key.get(SSLContext.class, annotation);
|
||||
super(annotation);
|
||||
}
|
||||
|
||||
public HttpClientProvider(Class<? extends Annotation> annotation)
|
||||
public HttpClientProvider(Class<? extends Annotation> annotationClazz)
|
||||
{
|
||||
configKey = Key.get(new TypeLiteral<Supplier<DruidHttpClientConfig>>(){}, annotation);
|
||||
sslContextKey = Key.get(SSLContext.class, annotation);
|
||||
}
|
||||
|
||||
@Inject
|
||||
public void configure(Injector injector)
|
||||
{
|
||||
configProvider = injector.getProvider(configKey);
|
||||
sslContextBinding = injector.getExistingBinding(sslContextKey);
|
||||
lifecycleProvider = injector.getProvider(Lifecycle.class);
|
||||
super(annotationClazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient get()
|
||||
{
|
||||
final DruidHttpClientConfig config = configProvider.get().get();
|
||||
final DruidHttpClientConfig config = getConfigProvider().get().get();
|
||||
|
||||
final HttpClientConfig.Builder builder = HttpClientConfig
|
||||
.builder()
|
||||
.withNumConnections(config.getNumConnections())
|
||||
.withReadTimeout(config.getReadTimeout());
|
||||
|
||||
if (sslContextBinding != null) {
|
||||
builder.withSslContext(sslContextBinding.getProvider().get());
|
||||
if (getSslContextBinding() != null) {
|
||||
builder.withSslContext(getSslContextBinding().getProvider().get());
|
||||
}
|
||||
|
||||
return HttpClientInit.createClient(builder.build(), lifecycleProvider.get());
|
||||
return HttpClientInit.createClient(builder.build(), getLifecycleProvider().get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.guice.http;
|
||||
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class JettyHttpClientModule implements Module
|
||||
{
|
||||
public static JettyHttpClientModule global()
|
||||
{
|
||||
return new JettyHttpClientModule("druid.global.http", Global.class);
|
||||
}
|
||||
|
||||
private final String propertyPrefix;
|
||||
private Annotation annotation = null;
|
||||
private Class<? extends Annotation> annotationClazz = null;
|
||||
|
||||
public JettyHttpClientModule(String propertyPrefix)
|
||||
{
|
||||
this.propertyPrefix = propertyPrefix;
|
||||
}
|
||||
|
||||
public JettyHttpClientModule(String propertyPrefix, Class<? extends Annotation> annotation)
|
||||
{
|
||||
this.propertyPrefix = propertyPrefix;
|
||||
this.annotationClazz = annotation;
|
||||
}
|
||||
|
||||
public JettyHttpClientModule(String propertyPrefix, Annotation annotation)
|
||||
{
|
||||
this.propertyPrefix = propertyPrefix;
|
||||
this.annotation = annotation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
if (annotation != null) {
|
||||
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class, annotation);
|
||||
binder.bind(HttpClient.class)
|
||||
.annotatedWith(annotation)
|
||||
.toProvider(new HttpClientProvider(annotation))
|
||||
.in(LazySingleton.class);
|
||||
} else if (annotationClazz != null) {
|
||||
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class, annotationClazz);
|
||||
binder.bind(HttpClient.class)
|
||||
.annotatedWith(annotationClazz)
|
||||
.toProvider(new HttpClientProvider(annotationClazz))
|
||||
.in(LazySingleton.class);
|
||||
} else {
|
||||
JsonConfigProvider.bind(binder, propertyPrefix, DruidHttpClientConfig.class);
|
||||
binder.bind(HttpClient.class)
|
||||
.toProvider(new HttpClientProvider())
|
||||
.in(LazySingleton.class);
|
||||
}
|
||||
}
|
||||
|
||||
public static class HttpClientProvider extends AbstractHttpClientProvider<HttpClient>
|
||||
{
|
||||
public HttpClientProvider()
|
||||
{
|
||||
}
|
||||
|
||||
public HttpClientProvider(Annotation annotation)
|
||||
{
|
||||
super(annotation);
|
||||
}
|
||||
|
||||
public HttpClientProvider(Class<? extends Annotation> annotation)
|
||||
{
|
||||
super(annotation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClient get()
|
||||
{
|
||||
final DruidHttpClientConfig config = getConfigProvider().get().get();
|
||||
|
||||
final HttpClient httpClient;
|
||||
if (getSslContextBinding() != null) {
|
||||
final SslContextFactory sslContextFactory = new SslContextFactory();
|
||||
sslContextFactory.setSslContext(getSslContextBinding().getProvider().get());
|
||||
httpClient = new HttpClient(sslContextFactory);
|
||||
} else {
|
||||
httpClient = new HttpClient();
|
||||
}
|
||||
|
||||
httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
|
||||
httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
|
||||
|
||||
final Lifecycle lifecycle = getLifecycleProvider().get();
|
||||
|
||||
try {
|
||||
lifecycle.addMaybeStartHandler(
|
||||
new Lifecycle.Handler()
|
||||
{
|
||||
@Override
|
||||
public void start() throws Exception
|
||||
{
|
||||
httpClient.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop()
|
||||
{
|
||||
try {
|
||||
httpClient.stop();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
return httpClient;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -40,7 +40,7 @@ import io.druid.guice.DruidProcessingModule;
|
|||
import io.druid.guice.DruidSecondaryModule;
|
||||
import io.druid.guice.ExtensionsConfig;
|
||||
import io.druid.guice.FirehoseModule;
|
||||
import io.druid.guice.HttpClientModule;
|
||||
import io.druid.guice.http.HttpClientModule;
|
||||
import io.druid.guice.IndexingServiceDiscoveryModule;
|
||||
import io.druid.guice.JacksonConfigManagerModule;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
|
|
|
@ -20,297 +20,46 @@
|
|||
package io.druid.server;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.http.client.response.ClientResponse;
|
||||
import com.metamx.http.client.response.HttpResponseHandler;
|
||||
import io.druid.client.RoutingDruidClient;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.query.DataSourceUtil;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.server.log.RequestLogger;
|
||||
import io.druid.server.router.QueryHostFinder;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.handler.codec.http.HttpChunk;
|
||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import io.druid.server.router.Router;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.proxy.AsyncProxyServlet;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.annotation.WebServlet;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* This class does async query processing and should be merged with QueryResource at some point
|
||||
*/
|
||||
@WebServlet(asyncSupported = true)
|
||||
public class AsyncQueryForwardingServlet extends HttpServlet
|
||||
public class AsyncQueryForwardingServlet extends AsyncProxyServlet
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
|
||||
private static final Joiner COMMA_JOIN = Joiner.on(",");
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final QueryHostFinder hostFinder;
|
||||
private final RoutingDruidClient routingDruidClient;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
|
||||
public AsyncQueryForwardingServlet(
|
||||
@Json ObjectMapper jsonMapper,
|
||||
@Smile ObjectMapper smileMapper,
|
||||
QueryHostFinder hostFinder,
|
||||
RoutingDruidClient routingDruidClient,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger
|
||||
)
|
||||
{
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.smileMapper = smileMapper;
|
||||
this.hostFinder = hostFinder;
|
||||
this.routingDruidClient = routingDruidClient;
|
||||
this.emitter = emitter;
|
||||
this.requestLogger = requestLogger;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse res)
|
||||
throws ServletException, IOException
|
||||
{
|
||||
final AsyncContext asyncContext = req.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
|
||||
final HttpResponseHandler<ServletOutputStream, ServletOutputStream> responseHandler =
|
||||
new PassthroughHttpResponseHandler(res);
|
||||
|
||||
final URI uri = rewriteURI(hostFinder.getDefaultHost(), req);
|
||||
asyncComplete(
|
||||
res,
|
||||
asyncContext,
|
||||
jsonMapper,
|
||||
routingDruidClient.get(uri, responseHandler)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDelete(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException
|
||||
{
|
||||
final AsyncContext asyncContext = req.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
|
||||
final HttpResponseHandler<ServletOutputStream, ServletOutputStream> responseHandler =
|
||||
new PassthroughHttpResponseHandler(res);
|
||||
|
||||
final String host = hostFinder.getDefaultHost();
|
||||
|
||||
asyncComplete(
|
||||
res,
|
||||
asyncContext,
|
||||
jsonMapper,
|
||||
routingDruidClient.delete(rewriteURI(host, (HttpServletRequest) asyncContext.getRequest()), responseHandler)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doPost(final HttpServletRequest req, final HttpServletResponse res) throws ServletException, IOException
|
||||
{
|
||||
final long start = System.currentTimeMillis();
|
||||
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(req.getContentType());
|
||||
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
|
||||
try {
|
||||
final Query inputQuery = objectMapper.readValue(req.getInputStream(), Query.class);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Got query [%s]", inputQuery);
|
||||
}
|
||||
|
||||
final Query query;
|
||||
if (inputQuery.getId() == null) {
|
||||
query = inputQuery.withId(UUID.randomUUID().toString());
|
||||
} else {
|
||||
query = inputQuery;
|
||||
}
|
||||
|
||||
URI rewrittenURI = rewriteURI(hostFinder.getHost(query), req);
|
||||
|
||||
final AsyncContext asyncContext = req.startAsync();
|
||||
// let proxy http client timeout
|
||||
asyncContext.setTimeout(0);
|
||||
|
||||
ListenableFuture future = routingDruidClient.postQuery(
|
||||
rewrittenURI,
|
||||
query,
|
||||
new PassthroughHttpResponseHandler(res)
|
||||
);
|
||||
|
||||
Futures.addCallback(
|
||||
future,
|
||||
new FutureCallback()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(@Nullable Object o)
|
||||
{
|
||||
final long requestTime = System.currentTimeMillis() - start;
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
.setUser3(String.valueOf(query.getContextPriority(0)))
|
||||
.setUser4(query.getType())
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(query.getId())
|
||||
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
try {
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"request/time",
|
||||
requestTime,
|
||||
"success",
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Unable to log query [%s]!", query);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable)
|
||||
{
|
||||
try {
|
||||
final String errorMessage = throwable.getMessage();
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"success",
|
||||
false,
|
||||
"exception",
|
||||
errorMessage == null ? "no message" : errorMessage)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException logError) {
|
||||
log.error(logError, "Unable to log query [%s]!", query);
|
||||
}
|
||||
|
||||
log.makeAlert(throwable, "Exception handling request [%s]", query.getId())
|
||||
.addData("query", query)
|
||||
.addData("peer", req.getRemoteAddr())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
asyncComplete(
|
||||
res,
|
||||
asyncContext,
|
||||
objectMapper,
|
||||
future
|
||||
);
|
||||
} catch(IOException e) {
|
||||
log.warn(e, "Exception parsing query");
|
||||
final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage();
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
null,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", errorMessage))
|
||||
)
|
||||
);
|
||||
res.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||
objectMapper.writeValue(
|
||||
res.getOutputStream(),
|
||||
ImmutableMap.of("error", errorMessage)
|
||||
);
|
||||
} catch(Exception e) {
|
||||
handleException(res, objectMapper, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void asyncComplete(
|
||||
final HttpServletResponse res,
|
||||
final AsyncContext asyncContext,
|
||||
final ObjectMapper objectMapper,
|
||||
ListenableFuture future
|
||||
)
|
||||
{
|
||||
Futures.addCallback(
|
||||
future,
|
||||
new FutureCallback<Object>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(@Nullable Object o)
|
||||
{
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable)
|
||||
{
|
||||
log.error(throwable, "Error processing query response");
|
||||
try {
|
||||
handleException(res, objectMapper, throwable);
|
||||
} catch(Exception err) {
|
||||
log.error(err, "Unable to handle exception response");
|
||||
}
|
||||
asyncContext.complete();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private URI rewriteURI(final String host, final HttpServletRequest req)
|
||||
{
|
||||
final StringBuilder uri = new StringBuilder("http://");
|
||||
uri.append(host);
|
||||
uri.append(req.getRequestURI());
|
||||
final String queryString = req.getQueryString();
|
||||
if (queryString != null) {
|
||||
uri.append("?").append(queryString);
|
||||
}
|
||||
return URI.create(uri.toString());
|
||||
}
|
||||
|
||||
private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Throwable exception) throws IOException
|
||||
private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception)
|
||||
throws IOException
|
||||
{
|
||||
if (!response.isCommitted()) {
|
||||
final String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
|
||||
|
@ -325,89 +74,256 @@ public class AsyncQueryForwardingServlet extends HttpServlet
|
|||
response.flushBuffer();
|
||||
}
|
||||
|
||||
private static class PassthroughHttpResponseHandler implements HttpResponseHandler<ServletOutputStream, ServletOutputStream>
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final QueryHostFinder hostFinder;
|
||||
private final HttpClient httpClient;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
|
||||
public AsyncQueryForwardingServlet(
|
||||
@Json ObjectMapper jsonMapper,
|
||||
@Smile ObjectMapper smileMapper,
|
||||
QueryHostFinder hostFinder,
|
||||
@Router HttpClient httpClient,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger
|
||||
)
|
||||
{
|
||||
private final HttpServletResponse response;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.smileMapper = smileMapper;
|
||||
this.hostFinder = hostFinder;
|
||||
this.httpClient = httpClient;
|
||||
this.emitter = emitter;
|
||||
this.requestLogger = requestLogger;
|
||||
}
|
||||
|
||||
public PassthroughHttpResponseHandler(HttpServletResponse response) throws IOException
|
||||
{
|
||||
this.response = response;
|
||||
}
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
final boolean isSmile = QueryResource.APPLICATION_SMILE.equals(request.getContentType());
|
||||
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
|
||||
|
||||
protected void copyStatusHeaders(HttpResponse clientResponse)
|
||||
{
|
||||
response.setStatus(clientResponse.getStatus().getCode());
|
||||
response.setContentType(clientResponse.headers().get(HttpHeaders.Names.CONTENT_TYPE));
|
||||
|
||||
FluentIterable.from(clientResponse.headers().entries())
|
||||
.filter(new Predicate<Map.Entry<String, String>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable Map.Entry<String, String> input)
|
||||
{
|
||||
return input.getKey().startsWith("X-Druid");
|
||||
}
|
||||
}
|
||||
)
|
||||
.transform(
|
||||
new Function<Map.Entry<String, String>, Object>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Object apply(@Nullable Map.Entry<String, String> input)
|
||||
{
|
||||
response.setHeader(input.getKey(), input.getValue());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
)
|
||||
.allMatch(Predicates.alwaysTrue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<ServletOutputStream> handleResponse(HttpResponse clientResponse)
|
||||
{
|
||||
copyStatusHeaders(clientResponse);
|
||||
String host = hostFinder.getDefaultHost();
|
||||
Query inputQuery = null;
|
||||
boolean hasContent = request.getContentLength() > 0 || request.getContentType() != null;
|
||||
boolean isQuery = request.getMethod().equals(HttpMethod.POST.asString());
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// queries only exist for POST
|
||||
if (isQuery) {
|
||||
try {
|
||||
final ServletOutputStream outputStream = response.getOutputStream();
|
||||
ChannelBuffer buf = clientResponse.getContent();
|
||||
buf.readBytes(outputStream, buf.readableBytes());
|
||||
return ClientResponse.unfinished(outputStream);
|
||||
inputQuery = objectMapper.readValue(request.getInputStream(), Query.class);
|
||||
if (inputQuery != null) {
|
||||
host = hostFinder.getHost(inputQuery);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Exception parsing query");
|
||||
final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage();
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
request.getRemoteAddr(),
|
||||
null,
|
||||
new QueryStats(ImmutableMap.<String, Object>of("success", false, "exception", errorMessage))
|
||||
)
|
||||
);
|
||||
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||
response.setContentType(QueryResource.APPLICATION_JSON);
|
||||
objectMapper.writeValue(
|
||||
response.getOutputStream(),
|
||||
ImmutableMap.of("error", errorMessage)
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
handleException(response, objectMapper, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<ServletOutputStream> handleChunk(
|
||||
ClientResponse<ServletOutputStream> clientResponse, HttpChunk chunk
|
||||
URI rewrittenURI = rewriteURI(host, request);
|
||||
if (rewrittenURI == null) {
|
||||
onRewriteFailed(request, response);
|
||||
return;
|
||||
}
|
||||
|
||||
final Request proxyRequest = getHttpClient().newRequest(rewrittenURI)
|
||||
.method(request.getMethod())
|
||||
.version(HttpVersion.fromString(request.getProtocol()));
|
||||
|
||||
// Copy headers
|
||||
for (Enumeration<String> headerNames = request.getHeaderNames(); headerNames.hasMoreElements(); ) {
|
||||
String headerName = headerNames.nextElement();
|
||||
|
||||
if (HttpHeader.TRANSFER_ENCODING.is(headerName)) {
|
||||
hasContent = true;
|
||||
}
|
||||
|
||||
for (Enumeration<String> headerValues = request.getHeaders(headerName); headerValues.hasMoreElements(); ) {
|
||||
String headerValue = headerValues.nextElement();
|
||||
if (headerValue != null) {
|
||||
proxyRequest.header(headerName, headerValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add proxy headers
|
||||
addViaHeader(proxyRequest);
|
||||
|
||||
addXForwardedHeaders(proxyRequest, request);
|
||||
|
||||
final AsyncContext asyncContext = request.startAsync();
|
||||
// We do not timeout the continuation, but the proxy request
|
||||
asyncContext.setTimeout(0);
|
||||
proxyRequest.timeout(
|
||||
getTimeout(), TimeUnit.MILLISECONDS
|
||||
);
|
||||
|
||||
if (hasContent) {
|
||||
if (inputQuery != null) {
|
||||
proxyRequest.content(new BytesContentProvider(jsonMapper.writeValueAsBytes(inputQuery)));
|
||||
} else {
|
||||
proxyRequest.content(proxyRequestContent(proxyRequest, request));
|
||||
}
|
||||
}
|
||||
|
||||
customizeProxyRequest(proxyRequest, request);
|
||||
|
||||
if (isQuery) {
|
||||
proxyRequest.send(newMetricsEmittingProxyResponseListener(request, response, inputQuery, startTime));
|
||||
} else {
|
||||
proxyRequest.send(newProxyResponseListener(request, response));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpClient createHttpClient() throws ServletException
|
||||
{
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
private URI rewriteURI(final String host, final HttpServletRequest req)
|
||||
{
|
||||
final StringBuilder uri = new StringBuilder("http://");
|
||||
|
||||
uri.append(host);
|
||||
uri.append(req.getRequestURI());
|
||||
final String queryString = req.getQueryString();
|
||||
if (queryString != null) {
|
||||
uri.append("?").append(queryString);
|
||||
}
|
||||
return URI.create(uri.toString());
|
||||
}
|
||||
|
||||
private Response.Listener newMetricsEmittingProxyResponseListener(
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response,
|
||||
Query query,
|
||||
long start
|
||||
)
|
||||
{
|
||||
return new MetricsEmittingProxyResponseListener(request, response, query, start);
|
||||
}
|
||||
|
||||
|
||||
private class MetricsEmittingProxyResponseListener extends ProxyResponseListener
|
||||
{
|
||||
private final HttpServletRequest req;
|
||||
private final HttpServletResponse res;
|
||||
private final Query query;
|
||||
private final long start;
|
||||
|
||||
public MetricsEmittingProxyResponseListener(
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response,
|
||||
Query query,
|
||||
long start
|
||||
)
|
||||
{
|
||||
super(request, response);
|
||||
|
||||
this.req = request;
|
||||
this.res = response;
|
||||
this.query = query;
|
||||
this.start = start;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
final long requestTime = System.currentTimeMillis() - start;
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
|
||||
.setUser3(String.valueOf(query.getContextPriority(0)))
|
||||
.setUser4(query.getType())
|
||||
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(req.getRemoteAddr())
|
||||
.setUser8(query.getId())
|
||||
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
|
||||
.build("request/time", requestTime)
|
||||
);
|
||||
|
||||
try {
|
||||
ChannelBuffer buf = chunk.getContent();
|
||||
buf.readBytes(clientResponse.getObj(), buf.readableBytes());
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"request/time",
|
||||
requestTime,
|
||||
"success",
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
log.error(e, "Unable to log query [%s]!", query);
|
||||
}
|
||||
return clientResponse;
|
||||
|
||||
super.onComplete(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse<ServletOutputStream> done(ClientResponse<ServletOutputStream> clientResponse)
|
||||
public void onFailure(Response response, Throwable failure)
|
||||
{
|
||||
return ClientResponse.finished(clientResponse.getObj());
|
||||
}
|
||||
try {
|
||||
final String errorMessage = failure.getMessage();
|
||||
requestLogger.log(
|
||||
new RequestLogLine(
|
||||
new DateTime(),
|
||||
req.getRemoteAddr(),
|
||||
query,
|
||||
new QueryStats(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"success",
|
||||
false,
|
||||
"exception",
|
||||
errorMessage == null ? "no message" : errorMessage
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException logError) {
|
||||
log.error(logError, "Unable to log query [%s]!", query);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<ServletOutputStream> clientResponse,
|
||||
Throwable e
|
||||
)
|
||||
{
|
||||
// exceptions are handled on future callback
|
||||
log.makeAlert(failure, "Exception handling request")
|
||||
.addData("exception", failure.toString())
|
||||
.addData("query", query)
|
||||
.addData("peer", req.getRemoteAddr())
|
||||
.emit();
|
||||
|
||||
super.onFailure(response, failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -148,16 +148,6 @@ public class QueryResource
|
|||
);
|
||||
}
|
||||
|
||||
if ((boolean) query.getContextValue("b", false)) {
|
||||
System.out.println("***NEW QUERY***");
|
||||
while (true) {
|
||||
System.out.println("SLEEPING");
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
} else if ((boolean) query.getContextValue("a", false)) {
|
||||
return Response.ok("hi").build();
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Got query [%s]", query);
|
||||
}
|
||||
|
|
|
@ -26,17 +26,15 @@ import com.google.inject.Provides;
|
|||
import com.google.inject.TypeLiteral;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.airlift.command.Command;
|
||||
import io.druid.client.RoutingDruidClient;
|
||||
import io.druid.curator.discovery.DiscoveryModule;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.HttpClientModule;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Client;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.guice.http.JettyHttpClientModule;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import io.druid.server.router.CoordinatorRuleManager;
|
||||
import io.druid.server.router.QueryHostFinder;
|
||||
|
@ -68,7 +66,7 @@ public class CliRouter extends ServerRunnable
|
|||
protected List<Object> getModules()
|
||||
{
|
||||
return ImmutableList.<Object>of(
|
||||
new HttpClientModule("druid.router.http", Router.class),
|
||||
new JettyHttpClientModule("druid.router.http", Router.class),
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
|
@ -81,12 +79,10 @@ public class CliRouter extends ServerRunnable
|
|||
|
||||
binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
|
||||
binder.bind(QueryHostFinder.class).in(LazySingleton.class);
|
||||
binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
|
||||
binder.bind(new TypeLiteral<List<TieredBrokerSelectorStrategy>>(){})
|
||||
.toProvider(TieredBrokerSelectorStrategiesProvider.class)
|
||||
.in(LazySingleton.class);
|
||||
|
||||
|
||||
binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
|
||||
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
|
|
|
@ -24,14 +24,14 @@ import com.google.inject.Inject;
|
|||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.RoutingDruidClient;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.server.AsyncQueryForwardingServlet;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.server.log.RequestLogger;
|
||||
import io.druid.server.router.QueryHostFinder;
|
||||
import io.druid.server.router.Router;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
|
@ -45,30 +45,27 @@ import org.eclipse.jetty.servlets.AsyncGzipFilter;
|
|||
*/
|
||||
public class RouterJettyServerInitializer implements JettyServerInitializer
|
||||
{
|
||||
private final ServerConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final QueryHostFinder hostFinder;
|
||||
private final RoutingDruidClient routingDruidClient;
|
||||
private final HttpClient httpClient;
|
||||
private final ServiceEmitter emitter;
|
||||
private final RequestLogger requestLogger;
|
||||
|
||||
@Inject
|
||||
public RouterJettyServerInitializer(
|
||||
ServerConfig config,
|
||||
@Json ObjectMapper jsonMapper,
|
||||
@Smile ObjectMapper smileMapper,
|
||||
QueryHostFinder hostFinder,
|
||||
RoutingDruidClient routingDruidClient,
|
||||
@Router HttpClient httpClient,
|
||||
ServiceEmitter emitter,
|
||||
RequestLogger requestLogger
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.smileMapper = smileMapper;
|
||||
this.hostFinder = hostFinder;
|
||||
this.routingDruidClient = routingDruidClient;
|
||||
this.httpClient = httpClient;
|
||||
this.emitter = emitter;
|
||||
this.requestLogger = requestLogger;
|
||||
}
|
||||
|
@ -83,7 +80,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
|
|||
jsonMapper,
|
||||
smileMapper,
|
||||
hostFinder,
|
||||
routingDruidClient,
|
||||
httpClient,
|
||||
emitter,
|
||||
requestLogger
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue