Merge pull request #1929 from pjain1/jetty_threads

separate ingestion and query thread pool
This commit is contained in:
Charles Allen 2015-11-17 12:14:25 -08:00
commit dbe201aeed
12 changed files with 316 additions and 37 deletions

View File

@ -257,6 +257,7 @@ Middle managers pass their configurations down to their child peons. The middle
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.startPort`|The port that peons begin running on.|8100|
|`druid.indexer.runner.separateIngestionEndpoint`|Use separate server and consequently separate jetty thread pool for ingesting events|false|
|`druid.worker.ip`|The IP of the worker.|localhost|
|`druid.worker.version`|Version identifier for the middle manager.|0|
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
@ -279,6 +280,13 @@ Additional peon configs include:
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.server.chathandler.http.numThreads`|Number of threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 16 + 2) + 30|
|`druid.indexer.server.chathandler.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations:
|Property|Description|Default|

View File

@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.annotations.Self;
@ -124,7 +125,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final File attemptDir = new File(taskDir, attemptUUID);
final ProcessHolder processHolder;
final int childPort = portFinder.findUnusedPort();
final int childPort;
final int childChatHandlerPort;
if (config.isSeparateIngestionEndpoint()) {
Pair<Integer, Integer> portPair = portFinder.findTwoConsecutiveUnusedPorts();
childPort = portPair.lhs;
childChatHandlerPort = portPair.rhs;
} else {
childPort = portFinder.findUnusedPort();
childChatHandlerPort = -1;
}
try {
final Closer closer = Closer.create();
try {
@ -233,6 +245,14 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
if(config.isSeparateIngestionEndpoint()) {
command.add(String.format("-Ddruid.indexer.task.chathandler.service=%s", "placeholder/serviceName"));
// Actual serviceName will be passed by the EventReceiverFirehose when it registers itself with ChatHandlerProvider
// Thus, "placeholder/serviceName" will be ignored
command.add(String.format("-Ddruid.indexer.task.chathandler.host=%s", childHost));
command.add(String.format("-Ddruid.indexer.task.chathandler.port=%d", childChatHandlerPort));
}
command.add("io.druid.cli.Main");
command.add("internal");
command.add("peon");
@ -301,6 +321,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
portFinder.markPortUnused(childPort);
if(childChatHandlerPort > 0) {
portFinder.markPortUnused(childChatHandlerPort);
}
log.info("Removing temporary directory: %s", attemptDir);
FileUtils.deleteDirectory(attemptDir);
}

View File

@ -19,6 +19,7 @@ package io.druid.indexing.overlord;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import java.io.IOException;
import java.net.BindException;
@ -74,6 +75,17 @@ public class PortFinder
return port;
}
public synchronized Pair<Integer, Integer> findTwoConsecutiveUnusedPorts()
{
int firstPort = chooseNext(startPort);
while (!canBind(firstPort) || !canBind(firstPort + 1)) {
firstPort = chooseNext(firstPort + 1);
}
usedPorts.add(firstPort);
usedPorts.add(firstPort + 1);
return new Pair<>(firstPort, firstPort + 1);
}
public synchronized void markPortUnused(int port)
{
usedPorts.remove(port);

View File

@ -65,6 +65,13 @@ public class ForkingTaskRunnerConfig
"hadoop"
);
@JsonProperty
private boolean separateIngestionEndpoint = false;
public boolean isSeparateIngestionEndpoint() {
return separateIngestionEndpoint;
}
public String getJavaCommand()
{
return javaCommand;

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RemoteChatHandler
{
}

View File

@ -23,8 +23,8 @@ import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.guice.annotations.RemoteChatHandler;
import java.util.concurrent.ConcurrentMap;
@ -43,7 +43,7 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider
@Inject
public ServiceAnnouncingChatHandlerProvider(
@Self DruidNode node,
@RemoteChatHandler DruidNode node,
ServiceAnnouncer serviceAnnouncer
)
{

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.initialization.jetty;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ServerConfig;
import org.eclipse.jetty.server.Server;
import java.util.Properties;
/**
*/
public class ChatHandlerServerModule implements Module
{
private static final Logger log = new Logger(ChatHandlerServerModule.class);
@Inject
private Properties properties;
@Override
public void configure(Binder binder)
{
/** If "druid.indexer.task.chathandler.port" property is set then we assume that a
* separate Jetty Server with it's own {@link ServerConfig} is required for ingestion apart from the query server
* otherwise we bind {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode}
* so that same Jetty Server is used for querying as well as ingestion
*/
if (properties.containsKey("druid.indexer.task.chathandler.port")) {
log.info("Spawning separate ingestion server at port [%s]", properties.get("druid.indexer.task.chathandler.port"));
JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", DruidNode.class, RemoteChatHandler.class);
JsonConfigProvider.bind(binder, "druid.indexer.server.chathandler.http", ServerConfig.class, RemoteChatHandler.class);
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
} else {
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class));
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
}
}
@Provides
@LazySingleton
@RemoteChatHandler
public Server getServer(Injector injector, Lifecycle lifecycle, @RemoteChatHandler DruidNode node, @RemoteChatHandler ServerConfig config)
{
final Server server = JettyServerModule.makeJettyServer(node, config);
JettyServerModule.initializeServer(injector, lifecycle, server);
return server;
}
}

View File

@ -108,38 +108,8 @@ public class JettyServerModule extends JerseyServletModule
@LazySingleton
public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config)
{
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
final Server server = makeJettyServer(node, config);
try {
initializer.initialize(server, injector);
}
catch (ConfigurationException e) {
throw new ProvisionException(Iterables.getFirst(e.getErrorMessages(), null).getMessage());
}
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
server.start();
}
@Override
public void stop()
{
try {
server.stop();
}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
}
}
);
initializeServer(injector, lifecycle, server);
return server;
}
@ -152,7 +122,7 @@ public class JettyServerModule extends JerseyServletModule
return provider;
}
private static Server makeJettyServer(@Self DruidNode node, ServerConfig config)
static Server makeJettyServer(DruidNode node, ServerConfig config)
{
final QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setMinThreads(config.getNumThreads());
@ -176,4 +146,38 @@ public class JettyServerModule extends JerseyServletModule
return server;
}
static void initializeServer(Injector injector, Lifecycle lifecycle, final Server server)
{
JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class);
try {
initializer.initialize(server, injector);
}
catch (ConfigurationException e) {
throw new ProvisionException(Iterables.getFirst(e.getErrorMessages(), null).getMessage());
}
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
server.start();
}
@Override
public void stop()
{
try {
server.stop();
}
catch (Exception e) {
log.warn(e, "Unable to stop Jetty server.");
}
}
}
);
}
}

View File

@ -0,0 +1,108 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.curator.discovery;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.metamx.common.ISE;
import io.druid.curator.CuratorTestBase;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class ServiceAnnouncerTest extends CuratorTestBase
{
@Before
public void setUp() throws Exception
{
setupServerAndCurator();
}
@Test
public void testServiceAnnouncement() throws Exception
{
curator.start();
List<String> serviceNames = ImmutableList.of(
"druid/overlord",
"druid/coordinator",
"druid/firehose/tranquility_test-50-0000-0000"
);
final ServiceDiscovery serviceDiscovery = createAndAnnounceServices(serviceNames);
Assert.assertTrue(
Iterators.all(
serviceNames.iterator(),
new Predicate<String>()
{
@Override
public boolean apply(String input)
{
try {
return serviceDiscovery.queryForInstances(input.replaceAll("/", ":")).size() == 1;
}
catch (Exception e) {
throw new ISE(
"Something went wrong while finding instance with name [%s] in Service Discovery",
input
);
}
}
}
)
);
}
@Test (expected = IllegalArgumentException.class)
public void testServiceAnnouncementFail() throws Exception
{
curator.start();
createAndAnnounceServices(ImmutableList.of("placeholder/\u0001"));
}
private ServiceDiscovery createAndAnnounceServices(List<String> serviceNames) throws Exception
{
int port = 1000;
ServiceDiscovery<Void> serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class)
.basePath("/test")
.client(curator)
.build();
for (String serviceName: serviceNames) {
String serviceNameToUse = CuratorServiceUtils.makeCanonicalServiceName(serviceName);
ServiceInstance instance = ServiceInstance.<Void>builder()
.name(serviceNameToUse)
.address("localhost")
.port(port++)
.build();
serviceDiscovery.registerService(instance);
}
return serviceDiscovery;
}
@After
public void tearDown()
{
tearDownServerAndCurator();
}
}

View File

@ -70,6 +70,7 @@ import io.druid.segment.realtime.firehose.ChatHandlerResource;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.eclipse.jetty.server.Server;
@ -198,7 +199,8 @@ public class CliPeon extends GuiceRunnable
}
},
new IndexingServiceFirehoseModule()
new IndexingServiceFirehoseModule(),
new ChatHandlerServerModule()
);
}

View File

@ -24,6 +24,7 @@ import com.google.inject.name.Names;
import com.metamx.common.logger.Logger;
import io.airlift.airline.Command;
import io.druid.guice.RealtimeModule;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import java.util.List;
@ -54,7 +55,8 @@ public class CliRealtime extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084);
}
}
},
new ChatHandlerServerModule()
);
}
}

View File

@ -43,6 +43,7 @@ import java.util.List;
*/
public class RealtimeModule implements Module
{
@Override
public void configure(Binder binder)
{