mirror of https://github.com/apache/druid.git
Add ability to specify list of task ports and port range (#6263)
* support specify list of task ports * fix typos * address comments * remove druid.indexer.runner.separateIngestionEndpoint config * tweak doc * fix doc * code cleanup * keep some useful comments
This commit is contained in:
parent
d50b69e6d4
commit
87ccee05f7
|
@ -1039,8 +1039,9 @@ Middle managers pass their configurations down to their child peons. The middle
|
|||
|`druid.indexer.runner.javaOpts`|*DEPRECATED* A string of -X Java options to pass to the peon's JVM. Quotable parameters or parameters with spaces are encouraged to use javaOptsArray|""|
|
||||
|`druid.indexer.runner.javaOptsArray`|A json array of strings to be passed in as options to the peon's jvm. This is additive to javaOpts and is recommended for properly handling arguments which contain quotes or spaces like `["-XX:OnOutOfMemoryError=kill -9 %p"]`|`[]`|
|
||||
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
||||
|`druid.indexer.runner.startPort`|Starting port used for peon processes, should be greater than 1023.|8100|
|
||||
|`druid.indexer.runner.separateIngestionEndpoint`|*Deprecated.* Use separate server and consequently separate jetty thread pool for ingesting events. Not supported with TLS.|false|
|
||||
|`druid.indexer.runner.startPort`|Starting port used for peon processes, should be greater than 1023 and less than 65536.|8100|
|
||||
|`druid.indexer.runner.endPort`|Ending port used for peon processes, should be greater than or equal to `druid.indexer.runner.startPort` and less than 65536.|65535|
|
||||
|`druid.indexer.runner.ports`|A json array of integers to specify ports that used for peon processes. If provided and non-empty, ports for peon processes will be chosen from these ports. And `druid.indexer.runner.startPort/druid.indexer.runner.endPort` will be completely ignored.|`[]`|
|
||||
|`druid.worker.ip`|The IP of the worker.|localhost|
|
||||
|`druid.worker.version`|Version identifier for the middle manager.|0|
|
||||
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
|
||||
|
@ -1103,14 +1104,6 @@ Additional peon configs include:
|
|||
|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|
||||
|`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
|
||||
|
||||
If the deprecated `druid.indexer.runner.separateIngestionEndpoint` property is set to true then following configurations
|
||||
are available for the ingestion server at peon:
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.indexer.server.chathandler.http.numThreads`|*Deprecated.* Number of threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 16 + 2) + 30|
|
||||
|`druid.indexer.server.chathandler.http.maxIdleTime`|*Deprecated.* The Jetty max idle time for a connection.|PT5M|
|
||||
|
||||
If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations:
|
||||
|
||||
|Property|Description|Default|
|
||||
|
|
|
@ -128,7 +128,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
this.taskLogPusher = taskLogPusher;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.node = node;
|
||||
this.portFinder = new PortFinder(config.getStartPort());
|
||||
this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts());
|
||||
this.exec = MoreExecutors.listeningDecorator(
|
||||
Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d")
|
||||
);
|
||||
|
@ -232,16 +232,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
final String childHost = node.getHost();
|
||||
int childPort = -1;
|
||||
int tlsChildPort = -1;
|
||||
int childChatHandlerPort = -1;
|
||||
|
||||
if (node.isEnablePlaintextPort()) {
|
||||
if (config.isSeparateIngestionEndpoint()) {
|
||||
Pair<Integer, Integer> portPair = portFinder.findTwoConsecutiveUnusedPorts();
|
||||
childPort = portPair.lhs;
|
||||
childChatHandlerPort = portPair.rhs;
|
||||
} else {
|
||||
childPort = portFinder.findUnusedPort();
|
||||
}
|
||||
childPort = portFinder.findUnusedPort();
|
||||
}
|
||||
|
||||
if (node.isEnableTlsPort()) {
|
||||
|
@ -396,22 +389,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
command.add("-XX:ThreadPriorityPolicy=42");
|
||||
*/
|
||||
|
||||
if (config.isSeparateIngestionEndpoint()) {
|
||||
command.add(StringUtils.format(
|
||||
"-Ddruid.indexer.task.chathandler.service=%s",
|
||||
"placeholder/serviceName"
|
||||
));
|
||||
// Actual serviceName will be passed by the EventReceiverFirehose when it registers itself with ChatHandlerProvider
|
||||
// Thus, "placeholder/serviceName" will be ignored
|
||||
command.add(StringUtils.format("-Ddruid.indexer.task.chathandler.host=%s", childHost));
|
||||
command.add(StringUtils.format(
|
||||
"-Ddruid.indexer.task.chathandler.port=%d",
|
||||
childChatHandlerPort
|
||||
));
|
||||
// Note - TLS is not supported with separate ingestion config,
|
||||
// if set then peon task will fail to start
|
||||
}
|
||||
|
||||
command.add("org.apache.druid.cli.Main");
|
||||
command.add("internal");
|
||||
command.add("peon");
|
||||
|
@ -515,9 +492,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
if (node.isEnableTlsPort()) {
|
||||
portFinder.markPortUnused(tlsChildPort);
|
||||
}
|
||||
if (childChatHandlerPort > 0) {
|
||||
portFinder.markPortUnused(childChatHandlerPort);
|
||||
}
|
||||
|
||||
try {
|
||||
if (!stopping && taskDir.exists()) {
|
||||
|
|
|
@ -19,27 +19,32 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.SocketException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class PortFinder
|
||||
{
|
||||
private final Set<Integer> usedPorts = Sets.newHashSet();
|
||||
private final int startPort;
|
||||
private final int endPort;
|
||||
private final List<Integer> candidatePorts;
|
||||
|
||||
public PortFinder(int startPort)
|
||||
public PortFinder(int startPort, int endPort, List<Integer> candidatePorts)
|
||||
{
|
||||
this.startPort = startPort;
|
||||
this.endPort = endPort;
|
||||
this.candidatePorts = candidatePorts;
|
||||
}
|
||||
|
||||
private static boolean canBind(int portNum)
|
||||
@VisibleForTesting
|
||||
boolean canBind(int portNum)
|
||||
{
|
||||
try {
|
||||
new ServerSocket(portNum).close();
|
||||
|
@ -55,23 +60,18 @@ public class PortFinder
|
|||
|
||||
public synchronized int findUnusedPort()
|
||||
{
|
||||
int port = chooseNext(startPort);
|
||||
while (!canBind(port)) {
|
||||
port = chooseNext(port + 1);
|
||||
if (candidatePorts != null && !candidatePorts.isEmpty()) {
|
||||
int port = chooseFromCandidates();
|
||||
usedPorts.add(port);
|
||||
return port;
|
||||
} else {
|
||||
int port = chooseNext(startPort);
|
||||
while (!canBind(port)) {
|
||||
port = chooseNext(port + 1);
|
||||
}
|
||||
usedPorts.add(port);
|
||||
return port;
|
||||
}
|
||||
usedPorts.add(port);
|
||||
return port;
|
||||
}
|
||||
|
||||
public synchronized Pair<Integer, Integer> findTwoConsecutiveUnusedPorts()
|
||||
{
|
||||
int firstPort = chooseNext(startPort);
|
||||
while (!canBind(firstPort) || !canBind(firstPort + 1)) {
|
||||
firstPort = chooseNext(firstPort + 1);
|
||||
}
|
||||
usedPorts.add(firstPort);
|
||||
usedPorts.add(firstPort + 1);
|
||||
return new Pair<>(firstPort, firstPort + 1);
|
||||
}
|
||||
|
||||
public synchronized void markPortUnused(int port)
|
||||
|
@ -79,15 +79,25 @@ public class PortFinder
|
|||
usedPorts.remove(port);
|
||||
}
|
||||
|
||||
private int chooseFromCandidates()
|
||||
{
|
||||
for (int port : candidatePorts) {
|
||||
if (!usedPorts.contains(port) && canBind(port)) {
|
||||
return port;
|
||||
}
|
||||
}
|
||||
throw new ISE("All ports are used...");
|
||||
}
|
||||
|
||||
private int chooseNext(int start)
|
||||
{
|
||||
// up to unsigned short max (65535)
|
||||
for (int i = start; i <= 0xFFFF; i++) {
|
||||
// up to endPort (which default value is 65535)
|
||||
for (int i = start; i <= endPort; i++) {
|
||||
if (!usedPorts.contains(i)) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
throw new ISE("All ports are Used..");
|
||||
throw new ISE("All ports are used...");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -62,6 +62,19 @@ public class ForkingTaskRunnerConfig
|
|||
@Max(65535)
|
||||
private int startPort = 8100;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1024)
|
||||
@Max(65535)
|
||||
private int endPort = 65535;
|
||||
|
||||
/**
|
||||
* Task ports your services are going to use. If non-empty, ports for one task will be chosen from these ports.
|
||||
* Otherwise, using startPort and endPort to generate usable ports.
|
||||
*/
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private List<Integer> ports = ImmutableList.of();
|
||||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
List<String> allowedPrefixes = Lists.newArrayList(
|
||||
|
@ -74,14 +87,6 @@ public class ForkingTaskRunnerConfig
|
|||
"hadoop"
|
||||
);
|
||||
|
||||
@JsonProperty
|
||||
private boolean separateIngestionEndpoint = false;
|
||||
|
||||
public boolean isSeparateIngestionEndpoint()
|
||||
{
|
||||
return separateIngestionEndpoint;
|
||||
}
|
||||
|
||||
public String getJavaCommand()
|
||||
{
|
||||
return javaCommand;
|
||||
|
@ -107,6 +112,16 @@ public class ForkingTaskRunnerConfig
|
|||
return startPort;
|
||||
}
|
||||
|
||||
public int getEndPort()
|
||||
{
|
||||
return endPort;
|
||||
}
|
||||
|
||||
public List<Integer> getPorts()
|
||||
{
|
||||
return ports;
|
||||
}
|
||||
|
||||
public List<String> getAllowedPrefixes()
|
||||
{
|
||||
return allowedPrefixes;
|
||||
|
|
|
@ -19,35 +19,55 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public class PortFinderTest
|
||||
{
|
||||
private final PortFinder finder = new PortFinder(1200);
|
||||
private final List<PortFinder> finders = new ArrayList<>();
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
// use startPort and endPort to generate usable ports.
|
||||
PortFinder finder1 = EasyMock.createMockBuilder(PortFinder.class)
|
||||
.withConstructor(1200, 1201, ImmutableList.of())
|
||||
.addMockedMethod("canBind")
|
||||
.createMock();
|
||||
// chose usable ports from candidates
|
||||
PortFinder finder2 = EasyMock.createMockBuilder(PortFinder.class)
|
||||
.withConstructor(1024, 1025, ImmutableList.of(1200, 1201))
|
||||
.addMockedMethod("canBind")
|
||||
.createMock();
|
||||
|
||||
finders.add(finder1);
|
||||
finders.add(finder2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUsedPort() throws IOException
|
||||
public void testUsedPort()
|
||||
{
|
||||
final int port1 = finder.findUnusedPort();
|
||||
// verify that the port is free
|
||||
ServerSocket socket1 = new ServerSocket(port1);
|
||||
finder.markPortUnused(port1);
|
||||
final int port2 = finder.findUnusedPort();
|
||||
Assert.assertNotEquals("Used port is not reallocated", port1, port2);
|
||||
// verify that port2 is free
|
||||
ServerSocket socket2 = new ServerSocket(port2);
|
||||
for (PortFinder finder : finders) {
|
||||
EasyMock.expect(finder.canBind(1200)).andReturn(true).andReturn(false);
|
||||
EasyMock.expect(finder.canBind(1201)).andReturn(true);
|
||||
EasyMock.replay(finder);
|
||||
|
||||
socket1.close();
|
||||
// Now port1 should get recycled
|
||||
Assert.assertEquals(port1, finder.findUnusedPort());
|
||||
final int port1 = finder.findUnusedPort();
|
||||
Assert.assertEquals(1200, port1);
|
||||
finder.markPortUnused(port1);
|
||||
|
||||
socket2.close();
|
||||
finder.markPortUnused(port1);
|
||||
finder.markPortUnused(port2);
|
||||
final int port2 = finder.findUnusedPort();
|
||||
Assert.assertEquals(1201, port2);
|
||||
finder.markPortUnused(port2);
|
||||
|
||||
EasyMock.verify(finder);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,6 +145,31 @@ public class ForkingTaskRunnerConfigTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPorts() throws JsonProcessingException
|
||||
{
|
||||
final List<Integer> ports = ImmutableList.of(1024, 1025);
|
||||
Assert.assertEquals(
|
||||
ports,
|
||||
buildFromProperties(
|
||||
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".ports",
|
||||
MAPPER.writeValueAsString(ports)
|
||||
).getPorts()
|
||||
);
|
||||
}
|
||||
|
||||
@Test(expected = ProvisionException.class)
|
||||
public void testExceptionalPorts()
|
||||
{
|
||||
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".ports", "not an Integer");
|
||||
}
|
||||
|
||||
@Test(expected = ProvisionException.class)
|
||||
public void testExceptionalPorts2()
|
||||
{
|
||||
buildFromProperties(IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".ports", "1024"); // not an array
|
||||
}
|
||||
|
||||
@Test(expected = ProvisionException.class)
|
||||
public void testExceptionalJavaOptArray()
|
||||
{
|
||||
|
|
|
@ -26,13 +26,11 @@ import com.google.inject.Module;
|
|||
import com.google.inject.Provides;
|
||||
import com.google.inject.multibindings.Multibinder;
|
||||
import org.apache.druid.guice.Jerseys;
|
||||
import org.apache.druid.guice.JsonConfigProvider;
|
||||
import org.apache.druid.guice.LazySingleton;
|
||||
import org.apache.druid.guice.LifecycleModule;
|
||||
import org.apache.druid.guice.annotations.RemoteChatHandler;
|
||||
import org.apache.druid.guice.annotations.Self;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
|
@ -47,10 +45,7 @@ import java.util.Properties;
|
|||
*/
|
||||
public class ChatHandlerServerModule implements Module
|
||||
{
|
||||
private static final Logger log = new Logger(ChatHandlerServerModule.class);
|
||||
private static final String MAX_CHAT_REQUESTS_PROPERTY = "druid.indexer.server.maxChatRequests";
|
||||
private static final String CHAT_PORT_PROPERTY = "druid.indexer.task.chathandler.port";
|
||||
|
||||
private final Properties properties;
|
||||
|
||||
public ChatHandlerServerModule(Properties properties)
|
||||
|
@ -72,32 +67,12 @@ public class ChatHandlerServerModule implements Module
|
|||
Multibinder.newSetBinder(binder, ServletFilterHolder.class).addBinding().to(TaskIdResponseHeaderFilterHolder.class);
|
||||
|
||||
/**
|
||||
* If "druid.indexer.task.chathandler.port" property is set then we assume that a separate Jetty Server with its
|
||||
* own {@link ServerConfig} is required for ingestion apart from the query server otherwise we bind
|
||||
* {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode}
|
||||
* We bind {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode}
|
||||
* so that same Jetty Server is used for querying as well as ingestion.
|
||||
*/
|
||||
if (properties.containsKey(CHAT_PORT_PROPERTY)) {
|
||||
log.info("Spawning separate ingestion server at port [%s]", properties.getProperty(CHAT_PORT_PROPERTY));
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", DruidNode.class, RemoteChatHandler.class);
|
||||
JsonConfigProvider.bind(
|
||||
binder,
|
||||
"druid.indexer.server.chathandler.http",
|
||||
ServerConfig.class,
|
||||
RemoteChatHandler.class
|
||||
);
|
||||
JsonConfigProvider.bind(
|
||||
binder,
|
||||
"druid.indexer.server.chathandler.https",
|
||||
TLSServerConfig.class,
|
||||
RemoteChatHandler.class
|
||||
);
|
||||
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
|
||||
} else {
|
||||
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class));
|
||||
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
|
||||
binder.bind(TLSServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(TLSServerConfig.class));
|
||||
}
|
||||
binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class));
|
||||
binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class));
|
||||
binder.bind(TLSServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(TLSServerConfig.class));
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
Loading…
Reference in New Issue