separate ingestion and query thread pool

This commit is contained in:
Parag Jain 2015-09-28 16:38:36 -05:00
parent 379ca87e6a
commit 6c498b7d4a
12 changed files with 316 additions and 38 deletions

View File

@ -251,6 +251,7 @@ Middle managers pass their configurations down to their child peons. The middle
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""| |`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.startPort`|The port that peons begin running on.|8100| |`druid.indexer.runner.startPort`|The port that peons begin running on.|8100|
|`druid.indexer.runner.separateIngestionEndpoint`|Use separate server and consequently separate jetty thread pool for ingesting events|false|
|`druid.worker.ip`|The IP of the worker.|localhost| |`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the middle manager.|0| |`druid.worker.version`|Version identifier for the middle manager.|0|
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1| |`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
@ -273,6 +274,13 @@ Additional peon configs include:
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.server.chathandler.http.numThreads`|Number of threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 16 + 2) + 30|
|`druid.indexer.server.chathandler.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations: If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations:
|Property|Description|Default| |Property|Description|Default|

View File

@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
@ -124,7 +125,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final File attemptDir = new File(taskDir, attemptUUID); final File attemptDir = new File(taskDir, attemptUUID);
final ProcessHolder processHolder; final ProcessHolder processHolder;
final int childPort = portFinder.findUnusedPort(); final int childPort;
final int childChatHandlerPort;
if (config.isSeparateIngestionEndpoint()) {
Pair<Integer, Integer> portPair = portFinder.findTwoConsecutiveUnusedPorts();
childPort = portPair.lhs;
childChatHandlerPort = portPair.rhs;
} else {
childPort = portFinder.findUnusedPort();
childChatHandlerPort = -1;
}
try { try {
final Closer closer = Closer.create(); final Closer closer = Closer.create();
try { try {
@ -233,6 +245,14 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add(String.format("-Ddruid.host=%s", childHost)); command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort)); command.add(String.format("-Ddruid.port=%d", childPort));
if(config.isSeparateIngestionEndpoint()) {
command.add(String.format("-Ddruid.indexer.task.chathandler.service=%s", "placeholder/serviceName"));
// Actual serviceName will be passed by the EventReceiverFirehose when it registers itself with ChatHandlerProvider
// Thus, "placeholder/serviceName" will be ignored
command.add(String.format("-Ddruid.indexer.task.chathandler.host=%s", childHost));
command.add(String.format("-Ddruid.indexer.task.chathandler.port=%d", childChatHandlerPort));
}
command.add("io.druid.cli.Main"); command.add("io.druid.cli.Main");
command.add("internal"); command.add("internal");
command.add("peon"); command.add("peon");
@ -301,6 +321,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
} }
} }
portFinder.markPortUnused(childPort); portFinder.markPortUnused(childPort);
if(childChatHandlerPort > 0) {
portFinder.markPortUnused(childChatHandlerPort);
}
log.info("Removing temporary directory: %s", attemptDir); log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir); FileUtils.deleteDirectory(attemptDir);
} }

View File

@ -19,6 +19,7 @@ package io.druid.indexing.overlord;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair;
import java.io.IOException; import java.io.IOException;
import java.net.BindException; import java.net.BindException;
@ -74,6 +75,17 @@ public class PortFinder
return port; return port;
} }
public synchronized Pair<Integer, Integer> findTwoConsecutiveUnusedPorts()
{
int firstPort = chooseNext(startPort);
while (!canBind(firstPort) || !canBind(firstPort + 1)) {
firstPort = chooseNext(firstPort + 1);
}
usedPorts.add(firstPort);
usedPorts.add(firstPort + 1);
return new Pair<>(firstPort, firstPort + 1);
}
public synchronized void markPortUnused(int port) public synchronized void markPortUnused(int port)
{ {
usedPorts.remove(port); usedPorts.remove(port);

View File

@ -65,6 +65,13 @@ public class ForkingTaskRunnerConfig
"hadoop" "hadoop"
); );
@JsonProperty
private boolean separateIngestionEndpoint = false;
public boolean isSeparateIngestionEndpoint() {
return separateIngestionEndpoint;
}
public String getJavaCommand() public String getJavaCommand()
{ {
return javaCommand; return javaCommand;

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RemoteChatHandler
{
}

View File

@ -23,8 +23,8 @@ import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.guice.annotations.RemoteChatHandler;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -43,7 +43,7 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
@Inject @Inject
public ServiceAnnouncingChatHandlerProvider( public ServiceAnnouncingChatHandlerProvider(
@Self DruidNode node, @RemoteChatHandler DruidNode node,
ServiceAnnouncer serviceAnnouncer ServiceAnnouncer serviceAnnouncer
) )
{ {

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.initialization.jetty;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ServerConfig;
import org.eclipse.jetty.server.Server;
import java.util.Properties;
/**
*/
public class ChatHandlerServerModule implements Module
{
private static final Logger log = new Logger(ChatHandlerServerModule.class);
@Inject
private Properties properties;
@Override
public void configure(Binder binder)
{
/** If "druid.indexer.task.chathandler.port" property is set then we assume that a
* separate Jetty Server with it's own {@link ServerConfig} is required for ingestion apart from the query server
* otherwise we bind {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode}
* so that same Jetty Server is used for querying as well as ingestion
*/
if (properties.containsKey("druid.indexer.task.chathandler.port")) {
log.info("Spawning separate ingestion server at port [%s]", properties.get("druid.indexer.task.chathandler.port"));
JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", DruidNode.class, RemoteChatHandler.class);
JsonConfigProvider.bind(binder, "druid.indexer.server.chathandler.http", ServerConfig.class, RemoteChatHandler.class);
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
} else {
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class));
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
}
}
@Provides
@LazySingleton
@RemoteChatHandler
public Server getServer(Injector injector, Lifecycle lifecycle, @RemoteChatHandler DruidNode node, @RemoteChatHandler ServerConfig config)
{
final Server server = JettyServerModule.makeJettyServer(node, config);
JettyServerModule.initializeServer(injector, lifecycle, server);
return server;
}
}

View File

@ -109,38 +109,8 @@ public class JettyServerModule extends JerseyServletModule
@LazySingleton @LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config) public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config)
{ {
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
final Server server = makeJettyServer(node, config); final Server server = makeJettyServer(node, config);
try { initializeServer(injector, lifecycle, server);
initializer.initialize(server, injector);
}
catch (ConfigurationException e) {
throw new ProvisionException(Iterables.getFirst(e.getErrorMessages(), null).getMessage());
}
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
server.start();
}
@Override
public void stop()
{
try {
server.stop();
}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
}
}
);
return server; return server;
} }
@ -153,7 +123,7 @@ public class JettyServerModule extends JerseyServletModule
return provider; return provider;
} }
private static Server makeJettyServer(@Self DruidNode node, ServerConfig config) static Server makeJettyServer(DruidNode node, ServerConfig config)
{ {
final QueuedThreadPool threadPool = new QueuedThreadPool(); final QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setMinThreads(config.getNumThreads()); threadPool.setMinThreads(config.getNumThreads());
@ -177,4 +147,38 @@ public class JettyServerModule extends JerseyServletModule
return server; return server;
} }
static void initializeServer(Injector injector, Lifecycle lifecycle, final Server server)
{
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
try {
initializer.initialize(server, injector);
}
catch (ConfigurationException e) {
throw new ProvisionException(Iterables.getFirst(e.getErrorMessages(), null).getMessage());
}
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
server.start();
}
@Override
public void stop()
{
try {
server.stop();
}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
}
}
);
}
} }

View File

@ -0,0 +1,108 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.curator.discovery;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.metamx.common.ISE;
import io.druid.curator.CuratorTestBase;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class ServiceAnnouncerTest extends CuratorTestBase
{
@Before
public void setUp() throws Exception
{
setupServerAndCurator();
}
@Test
public void testServiceAnnouncement() throws Exception
{
curator.start();
List<String> serviceNames = ImmutableList.of(
"druid/overlord",
"druid/coordinator",
"druid/firehose/tranquility_test-50-0000-0000"
);
final ServiceDiscovery serviceDiscovery = createAndAnnounceServices(serviceNames);
Assert.assertTrue(
Iterators.all(
serviceNames.iterator(),
new Predicate<String>()
{
@Override
public boolean apply(String input)
{
try {
return serviceDiscovery.queryForInstances(input.replaceAll("/", ":")).size() == 1;
}
catch (Exception e) {
throw new ISE(
"Something went wrong while finding instance with name [%s] in Service Discovery",
input
);
}
}
}
)
);
}
@Test (expected = IllegalArgumentException.class)
public void testServiceAnnouncementFail() throws Exception
{
curator.start();
createAndAnnounceServices(ImmutableList.of("placeholder/\u0001"));
}
private ServiceDiscovery createAndAnnounceServices(List<String> serviceNames) throws Exception
{
int port = 1000;
ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath("/test")
.client(curator)
.build();
for (String serviceName: serviceNames) {
String serviceNameToUse = CuratorServiceUtils.makeCanonicalServiceName(serviceName);
ServiceInstance instance = ServiceInstance.<Void>builder()
.name(serviceNameToUse)
.address("localhost")
.port(port++)
.build();
serviceDiscovery.registerService(instance);
}
return serviceDiscovery;
}
@After
public void tearDown()
{
tearDownServerAndCurator();
}
}

View File

@ -70,6 +70,7 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -198,7 +199,8 @@ public class CliPeon extends GuiceRunnable
} }
}, },
new IndexingServiceFirehoseModule() new IndexingServiceFirehoseModule(),
new ChatHandlerServerModule()
); );
} }

View File

@ -24,6 +24,7 @@ import com.google.inject.name.Names;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.druid.guice.RealtimeModule; import io.druid.guice.RealtimeModule;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import java.util.List; import java.util.List;
@ -54,7 +55,8 @@ public class CliRealtime extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime"); binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084);
} }
} },
new ChatHandlerServerModule()
); );
} }
} }

View File

@ -35,7 +35,6 @@ import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource; import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.util.List; import java.util.List;
@ -44,6 +43,7 @@ import java.util.List;
*/ */
public class RealtimeModule implements Module public class RealtimeModule implements Module
{ {
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {