mirror of https://github.com/apache/druid.git
Add druid.indexer.server.maxChatRequests for QoS; deprecate separate ports.
- Add druid.indexer.server.maxChatRequests, which sets up a QoSFilter on the main Jetty server. - Deprecate druid.indexer.runner.separateIngestionEndpoint - Deprecate druid.indexer.server.chathandler.*
This commit is contained in:
parent
1c4cfd5829
commit
23c993c9e7
|
@ -257,7 +257,7 @@ Middle managers pass their configurations down to their child peons. The middle
|
||||||
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM. Can be either a string or a json string list. Quotable parameters or parameters with spaces are encouraged to use json string lists|""|
|
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM. Can be either a string or a json string list. Quotable parameters or parameters with spaces are encouraged to use json string lists|""|
|
||||||
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
||||||
|`druid.indexer.runner.startPort`|The port that peons begin running on.|8100|
|
|`druid.indexer.runner.startPort`|The port that peons begin running on.|8100|
|
||||||
|`druid.indexer.runner.separateIngestionEndpoint`|Use separate server and consequently separate jetty thread pool for ingesting events|false|
|
|`druid.indexer.runner.separateIngestionEndpoint`|*Deprecated.* Use separate server and consequently separate jetty thread pool for ingesting events|false|
|
||||||
|`druid.worker.ip`|The IP of the worker.|localhost|
|
|`druid.worker.ip`|The IP of the worker.|localhost|
|
||||||
|`druid.worker.version`|Version identifier for the middle manager.|0|
|
|`druid.worker.version`|Version identifier for the middle manager.|0|
|
||||||
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
|
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
|
||||||
|
@ -276,19 +276,21 @@ Additional peon configs include:
|
||||||
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone node (Not recommended).|remote|
|
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone node (Not recommended).|remote|
|
||||||
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|
||||||
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|
||||||
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|
|
||||||
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|
|
||||||
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|
||||||
|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|
||||||
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|
|
||||||
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
|
||||||
|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|
||||||
|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|
||||||
|
|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|
||||||
|
|`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
|
||||||
|
|
||||||
If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon:
|
If the deprecated `druid.indexer.runner.separateIngestionEndpoint` property is set to true then following configurations
|
||||||
|
are available for the ingestion server at peon:
|
||||||
|
|
||||||
|Property|Description|Default|
|
|Property|Description|Default|
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.indexer.server.chathandler.http.numThreads`|Number of threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 16 + 2) + 30|
|
|`druid.indexer.server.chathandler.http.numThreads`|*Deprecated.* Number of threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 16 + 2) + 30|
|
||||||
|`druid.indexer.server.chathandler.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|
|`druid.indexer.server.chathandler.http.maxIdleTime`|*Deprecated.* The Jetty max idle time for a connection.|PT5m|
|
||||||
|
|
||||||
If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations:
|
If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations:
|
||||||
|
|
||||||
|
|
|
@ -27,11 +27,13 @@ import com.google.inject.Module;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
|
import io.druid.guice.Jerseys;
|
||||||
import io.druid.guice.JsonConfigProvider;
|
import io.druid.guice.JsonConfigProvider;
|
||||||
import io.druid.guice.LazySingleton;
|
import io.druid.guice.LazySingleton;
|
||||||
import io.druid.guice.LifecycleModule;
|
import io.druid.guice.LifecycleModule;
|
||||||
import io.druid.guice.annotations.RemoteChatHandler;
|
import io.druid.guice.annotations.RemoteChatHandler;
|
||||||
import io.druid.guice.annotations.Self;
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.segment.realtime.firehose.ChatHandlerResource;
|
||||||
import io.druid.server.DruidNode;
|
import io.druid.server.DruidNode;
|
||||||
import io.druid.server.initialization.ServerConfig;
|
import io.druid.server.initialization.ServerConfig;
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
|
@ -43,6 +45,8 @@ import java.util.Properties;
|
||||||
public class ChatHandlerServerModule implements Module
|
public class ChatHandlerServerModule implements Module
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(ChatHandlerServerModule.class);
|
private static final Logger log = new Logger(ChatHandlerServerModule.class);
|
||||||
|
private static final String MAX_CHAT_REQUESTS_PROPERTY = "druid.indexer.server.maxChatRequests";
|
||||||
|
private static final String CHAT_PORT_PROPERTY = "druid.indexer.task.chathandler.port";
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
private Properties properties;
|
private Properties properties;
|
||||||
|
@ -50,15 +54,28 @@ public class ChatHandlerServerModule implements Module
|
||||||
@Override
|
@Override
|
||||||
public void configure(Binder binder)
|
public void configure(Binder binder)
|
||||||
{
|
{
|
||||||
/** If "druid.indexer.task.chathandler.port" property is set then we assume that a
|
Jerseys.addResource(binder, ChatHandlerResource.class);
|
||||||
* separate Jetty Server with it's own {@link ServerConfig} is required for ingestion apart from the query server
|
|
||||||
* otherwise we bind {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode}
|
if (properties.containsKey(MAX_CHAT_REQUESTS_PROPERTY)) {
|
||||||
* so that same Jetty Server is used for querying as well as ingestion
|
final int maxRequests = Integer.parseInt(MAX_CHAT_REQUESTS_PROPERTY);
|
||||||
|
JettyBindings.addQosFilter(binder, "/druid/worker/v1/chat/*", maxRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If "druid.indexer.task.chathandler.port" property is set then we assume that a separate Jetty Server with its
|
||||||
|
* own {@link ServerConfig} is required for ingestion apart from the query server otherwise we bind
|
||||||
|
* {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode}
|
||||||
|
* so that same Jetty Server is used for querying as well as ingestion.
|
||||||
*/
|
*/
|
||||||
if (properties.containsKey("druid.indexer.task.chathandler.port")) {
|
if (properties.containsKey(CHAT_PORT_PROPERTY)) {
|
||||||
log.info("Spawning separate ingestion server at port [%s]", properties.get("druid.indexer.task.chathandler.port"));
|
log.info("Spawning separate ingestion server at port [%s]", properties.getProperty(CHAT_PORT_PROPERTY));
|
||||||
JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", DruidNode.class, RemoteChatHandler.class);
|
JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", DruidNode.class, RemoteChatHandler.class);
|
||||||
JsonConfigProvider.bind(binder, "druid.indexer.server.chathandler.http", ServerConfig.class, RemoteChatHandler.class);
|
JsonConfigProvider.bind(
|
||||||
|
binder,
|
||||||
|
"druid.indexer.server.chathandler.http",
|
||||||
|
ServerConfig.class,
|
||||||
|
RemoteChatHandler.class
|
||||||
|
);
|
||||||
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
|
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
|
||||||
} else {
|
} else {
|
||||||
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class));
|
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class));
|
||||||
|
@ -69,7 +86,12 @@ public class ChatHandlerServerModule implements Module
|
||||||
@Provides
|
@Provides
|
||||||
@LazySingleton
|
@LazySingleton
|
||||||
@RemoteChatHandler
|
@RemoteChatHandler
|
||||||
public Server getServer(Injector injector, Lifecycle lifecycle, @RemoteChatHandler DruidNode node, @RemoteChatHandler ServerConfig config)
|
public Server getServer(
|
||||||
|
Injector injector,
|
||||||
|
Lifecycle lifecycle,
|
||||||
|
@RemoteChatHandler DruidNode node,
|
||||||
|
@RemoteChatHandler ServerConfig config
|
||||||
|
)
|
||||||
{
|
{
|
||||||
final Server server = JettyServerModule.makeJettyServer(node, config);
|
final Server server = JettyServerModule.makeJettyServer(node, config);
|
||||||
JettyServerModule.initializeServer(injector, lifecycle, server);
|
JettyServerModule.initializeServer(injector, lifecycle, server);
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.initialization.jetty;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.multibindings.Multibinder;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import org.eclipse.jetty.servlets.QoSFilter;
|
||||||
|
|
||||||
|
import javax.servlet.DispatcherType;
|
||||||
|
import javax.servlet.Filter;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class JettyBindings
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(JettyBindings.class);
|
||||||
|
|
||||||
|
private JettyBindings()
|
||||||
|
{
|
||||||
|
// No instantiation.
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void addQosFilter(Binder binder, String path, int maxRequests)
|
||||||
|
{
|
||||||
|
if (maxRequests <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Multibinder.newSetBinder(binder, ServletFilterHolder.class)
|
||||||
|
.addBinding()
|
||||||
|
.toInstance(new QosFilterHolder(path, maxRequests));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class QosFilterHolder implements ServletFilterHolder
|
||||||
|
{
|
||||||
|
private final String path;
|
||||||
|
private final int maxRequests;
|
||||||
|
|
||||||
|
public QosFilterHolder(String path, int maxRequests)
|
||||||
|
{
|
||||||
|
this.path = path;
|
||||||
|
this.maxRequests = maxRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Filter getFilter()
|
||||||
|
{
|
||||||
|
return new QoSFilter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<? extends Filter> getFilterClass()
|
||||||
|
{
|
||||||
|
return QoSFilter.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getInitParameters()
|
||||||
|
{
|
||||||
|
return ImmutableMap.of("maxRequests", String.valueOf(maxRequests));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPath()
|
||||||
|
{
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EnumSet<DispatcherType> getDispatcherType()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,28 +20,17 @@
|
||||||
package io.druid.server.initialization;
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.inject.Binder;
|
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.Module;
|
|
||||||
import com.google.inject.multibindings.Multibinder;
|
|
||||||
import com.google.inject.servlet.GuiceFilter;
|
import com.google.inject.servlet.GuiceFilter;
|
||||||
import com.metamx.common.lifecycle.Lifecycle;
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
import com.metamx.http.client.HttpClientConfig;
|
import com.metamx.http.client.HttpClientConfig;
|
||||||
import com.metamx.http.client.HttpClientInit;
|
import com.metamx.http.client.HttpClientInit;
|
||||||
import io.druid.guice.GuiceInjectors;
|
|
||||||
import io.druid.guice.Jerseys;
|
|
||||||
import io.druid.guice.JsonConfigProvider;
|
|
||||||
import io.druid.guice.LazySingleton;
|
|
||||||
import io.druid.guice.LifecycleModule;
|
|
||||||
import io.druid.guice.annotations.Self;
|
import io.druid.guice.annotations.Self;
|
||||||
import io.druid.initialization.Initialization;
|
|
||||||
import io.druid.server.DruidNode;
|
import io.druid.server.DruidNode;
|
||||||
import io.druid.server.initialization.jetty.JettyServerInitUtils;
|
import io.druid.server.initialization.jetty.JettyServerInitUtils;
|
||||||
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||||
import io.druid.server.initialization.jetty.ServletFilterHolder;
|
|
||||||
import org.eclipse.jetty.server.Handler;
|
import org.eclipse.jetty.server.Handler;
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
import org.eclipse.jetty.server.handler.HandlerList;
|
import org.eclipse.jetty.server.handler.HandlerList;
|
||||||
|
@ -53,7 +42,6 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.servlet.DispatcherType;
|
|
||||||
import javax.servlet.Filter;
|
import javax.servlet.Filter;
|
||||||
import javax.servlet.FilterChain;
|
import javax.servlet.FilterChain;
|
||||||
import javax.servlet.FilterConfig;
|
import javax.servlet.FilterConfig;
|
||||||
|
@ -72,15 +60,14 @@ import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class BaseJettyTest
|
public abstract class BaseJettyTest
|
||||||
{
|
{
|
||||||
protected Lifecycle lifecycle;
|
protected Lifecycle lifecycle;
|
||||||
protected HttpClient client;
|
protected HttpClient client;
|
||||||
|
protected Server server;
|
||||||
protected int port = -1;
|
protected int port = -1;
|
||||||
|
|
||||||
public static void setProperties()
|
public static void setProperties()
|
||||||
|
@ -100,69 +87,11 @@ public class BaseJettyTest
|
||||||
lifecycle = injector.getInstance(Lifecycle.class);
|
lifecycle = injector.getInstance(Lifecycle.class);
|
||||||
lifecycle.start();
|
lifecycle.start();
|
||||||
ClientHolder holder = injector.getInstance(ClientHolder.class);
|
ClientHolder holder = injector.getInstance(ClientHolder.class);
|
||||||
|
server = injector.getInstance(Server.class);
|
||||||
client = holder.getClient();
|
client = holder.getClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Injector setupInjector()
|
protected abstract Injector setupInjector();
|
||||||
{
|
|
||||||
return Initialization.makeInjectorWithModules(
|
|
||||||
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
|
||||||
new Module()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void configure(Binder binder)
|
|
||||||
{
|
|
||||||
JsonConfigProvider.bindInstance(
|
|
||||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
|
|
||||||
);
|
|
||||||
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
|
|
||||||
|
|
||||||
Multibinder<ServletFilterHolder> multibinder = Multibinder.newSetBinder(binder, ServletFilterHolder.class);
|
|
||||||
multibinder.addBinding().toInstance(
|
|
||||||
new ServletFilterHolder()
|
|
||||||
{
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getPath()
|
|
||||||
{
|
|
||||||
return "/*";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getInitParameters()
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Class<? extends Filter> getFilterClass()
|
|
||||||
{
|
|
||||||
return DummyAuthFilter.class;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Filter getFilter()
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EnumSet<DispatcherType> getDispatcherType()
|
|
||||||
{
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Jerseys.addResource(binder, SlowResource.class);
|
|
||||||
Jerseys.addResource(binder, ExceptionResource.class);
|
|
||||||
Jerseys.addResource(binder, DefaultResource.class);
|
|
||||||
LifecycleModule.register(binder, Server.class);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void teardown()
|
public void teardown()
|
||||||
|
@ -175,10 +104,15 @@ public class BaseJettyTest
|
||||||
HttpClient client;
|
HttpClient client;
|
||||||
|
|
||||||
ClientHolder()
|
ClientHolder()
|
||||||
|
{
|
||||||
|
this(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientHolder(int maxClientConnections)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
this.client = HttpClientInit.createClient(
|
this.client = HttpClientInit.createClient(
|
||||||
new HttpClientConfig(1, SSLContext.getDefault(), Duration.ZERO),
|
new HttpClientConfig(maxClientConnections, SSLContext.getDefault(), Duration.ZERO),
|
||||||
new Lifecycle()
|
new Lifecycle()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -224,7 +158,7 @@ public class BaseJettyTest
|
||||||
public Response hello()
|
public Response hello()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(2000));
|
TimeUnit.MILLISECONDS.sleep(500 + random.nextInt(1600));
|
||||||
}
|
}
|
||||||
catch (InterruptedException e) {
|
catch (InterruptedException e) {
|
||||||
//
|
//
|
||||||
|
@ -282,7 +216,8 @@ public class BaseJettyTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class DummyAuthFilter implements Filter {
|
public static class DummyAuthFilter implements Filter
|
||||||
|
{
|
||||||
|
|
||||||
public static final String AUTH_HDR = "secretUser";
|
public static final String AUTH_HDR = "secretUser";
|
||||||
public static final String SECRET_USER = "bob";
|
public static final String SECRET_USER = "bob";
|
||||||
|
@ -294,10 +229,10 @@ public class BaseJettyTest
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException,
|
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException,
|
||||||
ServletException
|
ServletException
|
||||||
{
|
{
|
||||||
HttpServletRequest request = (HttpServletRequest) req;
|
HttpServletRequest request = (HttpServletRequest) req;
|
||||||
if(request.getHeader(AUTH_HDR) == null || request.getHeader(AUTH_HDR).equals(SECRET_USER)) {
|
if (request.getHeader(AUTH_HDR) == null || request.getHeader(AUTH_HDR).equals(SECRET_USER)) {
|
||||||
chain.doFilter(req, resp);
|
chain.doFilter(req, resp);
|
||||||
} else {
|
} else {
|
||||||
HttpServletResponse response = (HttpServletResponse) resp;
|
HttpServletResponse response = (HttpServletResponse) resp;
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.metamx.http.client.HttpClient;
|
||||||
|
import com.metamx.http.client.Request;
|
||||||
|
import com.metamx.http.client.response.StatusResponseHandler;
|
||||||
|
import com.metamx.http.client.response.StatusResponseHolder;
|
||||||
|
import io.druid.concurrent.Execs;
|
||||||
|
import io.druid.guice.GuiceInjectors;
|
||||||
|
import io.druid.guice.Jerseys;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.initialization.Initialization;
|
||||||
|
import io.druid.server.DruidNode;
|
||||||
|
import io.druid.server.initialization.jetty.JettyBindings;
|
||||||
|
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class JettyQosTest extends BaseJettyTest
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected Injector setupInjector()
|
||||||
|
{
|
||||||
|
return Initialization.makeInjectorWithModules(
|
||||||
|
GuiceInjectors.makeStartupInjector(),
|
||||||
|
ImmutableList.<Module>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bindInstance(
|
||||||
|
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
|
||||||
|
);
|
||||||
|
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
|
||||||
|
Jerseys.addResource(binder, SlowResource.class);
|
||||||
|
Jerseys.addResource(binder, ExceptionResource.class);
|
||||||
|
Jerseys.addResource(binder, DefaultResource.class);
|
||||||
|
JettyBindings.addQosFilter(binder, "/slow/*", 2);
|
||||||
|
final ServerConfig serverConfig = new ObjectMapper().convertValue(
|
||||||
|
ImmutableMap.of("numThreads", "10"),
|
||||||
|
ServerConfig.class
|
||||||
|
);
|
||||||
|
binder.bind(ServerConfig.class).toInstance(serverConfig);
|
||||||
|
LifecycleModule.register(binder, Server.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumThreads()
|
||||||
|
{
|
||||||
|
// Just make sure the injector stuff for this test is actually working.
|
||||||
|
Assert.assertEquals(
|
||||||
|
10,
|
||||||
|
((QueuedThreadPool) server.getThreadPool()).getMaxThreads()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60_000L)
|
||||||
|
public void testQoS() throws Exception
|
||||||
|
{
|
||||||
|
final int fastThreads = 20;
|
||||||
|
final int slowThreads = 15;
|
||||||
|
final int slowRequestsPerThread = 5;
|
||||||
|
final int fastRequestsPerThread = 200;
|
||||||
|
final HttpClient fastClient = new ClientHolder(fastThreads).getClient();
|
||||||
|
final HttpClient slowClient = new ClientHolder(slowThreads).getClient();
|
||||||
|
final ExecutorService fastPool = Execs.multiThreaded(fastThreads, "fast-%d");
|
||||||
|
final ExecutorService slowPool = Execs.multiThreaded(slowThreads, "slow-%d");
|
||||||
|
final CountDownLatch latch = new CountDownLatch(fastThreads * fastRequestsPerThread);
|
||||||
|
final AtomicLong fastCount = new AtomicLong();
|
||||||
|
final AtomicLong slowCount = new AtomicLong();
|
||||||
|
final AtomicLong fastElapsed = new AtomicLong();
|
||||||
|
final AtomicLong slowElapsed = new AtomicLong();
|
||||||
|
|
||||||
|
for (int i = 0; i < slowThreads; i++) {
|
||||||
|
slowPool.submit(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
for (int i = 0; i < slowRequestsPerThread; i++) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
ListenableFuture<StatusResponseHolder> go =
|
||||||
|
slowClient.go(
|
||||||
|
new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/slow/hello")),
|
||||||
|
new StatusResponseHandler(Charset.defaultCharset())
|
||||||
|
);
|
||||||
|
go.get();
|
||||||
|
slowCount.incrementAndGet();
|
||||||
|
slowElapsed.addAndGet(System.currentTimeMillis() - startTime);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
// BE COOL
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for jetty server pool to completely fill up
|
||||||
|
while (server.getThreadPool().getIdleThreads() != 0) {
|
||||||
|
Thread.sleep(25);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < fastThreads; i++) {
|
||||||
|
fastPool.submit(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
for (int i = 0; i < fastRequestsPerThread; i++) {
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
ListenableFuture<StatusResponseHolder> go =
|
||||||
|
fastClient.go(
|
||||||
|
new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/default")),
|
||||||
|
new StatusResponseHandler(Charset.defaultCharset())
|
||||||
|
);
|
||||||
|
go.get();
|
||||||
|
fastCount.incrementAndGet();
|
||||||
|
fastElapsed.addAndGet(System.currentTimeMillis() - startTime);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
// BE COOL
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all fast requests to be served
|
||||||
|
latch.await();
|
||||||
|
|
||||||
|
slowPool.shutdownNow();
|
||||||
|
fastPool.shutdown();
|
||||||
|
|
||||||
|
// check that fast requests finished quickly
|
||||||
|
Assert.assertTrue(fastElapsed.get() / fastCount.get() < 500);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,17 +20,36 @@
|
||||||
package io.druid.server.initialization;
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.multibindings.Multibinder;
|
||||||
import com.metamx.http.client.Request;
|
import com.metamx.http.client.Request;
|
||||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
import com.metamx.http.client.response.InputStreamResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHandler;
|
import com.metamx.http.client.response.StatusResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHolder;
|
import com.metamx.http.client.response.StatusResponseHolder;
|
||||||
|
import io.druid.guice.GuiceInjectors;
|
||||||
|
import io.druid.guice.Jerseys;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.LazySingleton;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.initialization.Initialization;
|
||||||
|
import io.druid.server.DruidNode;
|
||||||
|
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||||
|
import io.druid.server.initialization.jetty.ServletFilterHolder;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.servlet.DispatcherType;
|
||||||
|
import javax.servlet.Filter;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -38,6 +57,8 @@ import java.io.StringWriter;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -46,6 +67,72 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
public class JettyTest extends BaseJettyTest
|
public class JettyTest extends BaseJettyTest
|
||||||
{
|
{
|
||||||
|
@Override
|
||||||
|
protected Injector setupInjector()
|
||||||
|
{
|
||||||
|
return Initialization.makeInjectorWithModules(
|
||||||
|
GuiceInjectors.makeStartupInjector(),
|
||||||
|
ImmutableList.<Module>of(
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bindInstance(
|
||||||
|
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null)
|
||||||
|
);
|
||||||
|
binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class);
|
||||||
|
|
||||||
|
Multibinder<ServletFilterHolder> multibinder = Multibinder.newSetBinder(
|
||||||
|
binder,
|
||||||
|
ServletFilterHolder.class
|
||||||
|
);
|
||||||
|
|
||||||
|
multibinder.addBinding().toInstance(
|
||||||
|
new ServletFilterHolder()
|
||||||
|
{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPath()
|
||||||
|
{
|
||||||
|
return "/*";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getInitParameters()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Class<? extends Filter> getFilterClass()
|
||||||
|
{
|
||||||
|
return DummyAuthFilter.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Filter getFilter()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EnumSet<DispatcherType> getDispatcherType()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Jerseys.addResource(binder, SlowResource.class);
|
||||||
|
Jerseys.addResource(binder, ExceptionResource.class);
|
||||||
|
Jerseys.addResource(binder, DefaultResource.class);
|
||||||
|
LifecycleModule.register(binder, Server.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore // this test will deadlock if it hits an issue, so ignored by default
|
@Ignore // this test will deadlock if it hits an issue, so ignored by default
|
||||||
|
|
|
@ -189,7 +189,6 @@ public class CliPeon extends GuiceRunnable
|
||||||
|
|
||||||
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
|
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
|
||||||
Jerseys.addResource(binder, QueryResource.class);
|
Jerseys.addResource(binder, QueryResource.class);
|
||||||
Jerseys.addResource(binder, ChatHandlerResource.class);
|
|
||||||
LifecycleModule.register(binder, QueryResource.class);
|
LifecycleModule.register(binder, QueryResource.class);
|
||||||
LifecycleModule.register(binder, LookupReferencesManager.class);
|
LifecycleModule.register(binder, LookupReferencesManager.class);
|
||||||
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
|
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
|
||||||
|
|
|
@ -105,7 +105,6 @@ public class RealtimeModule implements Module
|
||||||
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));
|
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));
|
||||||
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
|
||||||
Jerseys.addResource(binder, QueryResource.class);
|
Jerseys.addResource(binder, QueryResource.class);
|
||||||
Jerseys.addResource(binder, ChatHandlerResource.class);
|
|
||||||
LifecycleModule.register(binder, QueryResource.class);
|
LifecycleModule.register(binder, QueryResource.class);
|
||||||
LifecycleModule.register(binder, LookupReferencesManager.class);
|
LifecycleModule.register(binder, LookupReferencesManager.class);
|
||||||
LifecycleModule.register(binder, Server.class);
|
LifecycleModule.register(binder, Server.class);
|
||||||
|
|
Loading…
Reference in New Issue