mirror of https://github.com/apache/druid.git
Merge pull request #633 from metamx/remove-config
Remove redundant forking task runner config
This commit is contained in:
commit
7a7386be74
|
@ -48,7 +48,6 @@ Middle managers pass their configurations down to their child peons. The middle
|
||||||
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
|
|`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1|
|
||||||
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the middle managers should compress Znodes.|false|
|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the middle managers should compress Znodes.|false|
|
||||||
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|
||||||
|`druid.indexer.runner.taskDir`|Temporary intermediate directory used during task execution.|/tmp/persistent|
|
|
||||||
|`druid.indexer.runner.javaCommand`|Command required to execute java.|java|
|
|`druid.indexer.runner.javaCommand`|Command required to execute java.|java|
|
||||||
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""|
|
|`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""|
|
||||||
|`druid.indexer.runner.classpath`|Java classpath for the peon.|System.getProperty("java.class.path")|
|
|`druid.indexer.runner.classpath`|Java classpath for the peon.|System.getProperty("java.class.path")|
|
||||||
|
|
|
@ -154,8 +154,7 @@ druid.indexer.logs.s3Prefix=prod/logs/v1
|
||||||
|
|
||||||
# Dedicate more resources to peons
|
# Dedicate more resources to peons
|
||||||
druid.indexer.runner.javaOpts=-server -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
|
druid.indexer.runner.javaOpts=-server -Xmx3g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
|
||||||
druid.indexer.runner.taskDir=/mnt/persistent/task/
|
druid.indexer.task.baseTaskDir=/mnt/persistent/task/
|
||||||
druid.indexer.task.taskDir=/mnt/persistent/task/
|
|
||||||
druid.indexer.task.chathandler.type=announce
|
druid.indexer.task.chathandler.type=announce
|
||||||
|
|
||||||
druid.indexer.fork.property.druid.indexer.hadoopWorkingPath=/tmp/druid-indexing
|
druid.indexer.fork.property.druid.indexer.hadoopWorkingPath=/tmp/druid-indexing
|
||||||
|
|
|
@ -42,6 +42,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.guice.annotations.Self;
|
import io.druid.guice.annotations.Self;
|
||||||
import io.druid.indexing.common.TaskStatus;
|
import io.druid.indexing.common.TaskStatus;
|
||||||
|
import io.druid.indexing.common.config.TaskConfig;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
|
||||||
import io.druid.indexing.worker.config.WorkerConfig;
|
import io.druid.indexing.worker.config.WorkerConfig;
|
||||||
|
@ -74,6 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
private static final Splitter whiteSpaceSplitter = Splitter.on(CharMatcher.WHITESPACE).omitEmptyStrings();
|
private static final Splitter whiteSpaceSplitter = Splitter.on(CharMatcher.WHITESPACE).omitEmptyStrings();
|
||||||
|
|
||||||
private final ForkingTaskRunnerConfig config;
|
private final ForkingTaskRunnerConfig config;
|
||||||
|
private final TaskConfig taskConfig;
|
||||||
private final Properties props;
|
private final Properties props;
|
||||||
private final TaskLogPusher taskLogPusher;
|
private final TaskLogPusher taskLogPusher;
|
||||||
private final DruidNode node;
|
private final DruidNode node;
|
||||||
|
@ -86,6 +88,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
@Inject
|
@Inject
|
||||||
public ForkingTaskRunner(
|
public ForkingTaskRunner(
|
||||||
ForkingTaskRunnerConfig config,
|
ForkingTaskRunnerConfig config,
|
||||||
|
TaskConfig taskConfig,
|
||||||
WorkerConfig workerConfig,
|
WorkerConfig workerConfig,
|
||||||
Properties props,
|
Properties props,
|
||||||
TaskLogPusher taskLogPusher,
|
TaskLogPusher taskLogPusher,
|
||||||
|
@ -94,6 +97,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.taskConfig = taskConfig;
|
||||||
this.props = props;
|
this.props = props;
|
||||||
this.taskLogPusher = taskLogPusher;
|
this.taskLogPusher = taskLogPusher;
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
|
@ -119,7 +123,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
||||||
public TaskStatus call()
|
public TaskStatus call()
|
||||||
{
|
{
|
||||||
final String attemptUUID = UUID.randomUUID().toString();
|
final String attemptUUID = UUID.randomUUID().toString();
|
||||||
final File taskDir = new File(config.getTaskDir(), task.getId());
|
final File taskDir = new File(taskConfig.getBaseTaskDir(), task.getId());
|
||||||
final File attemptDir = new File(taskDir, attemptUUID);
|
final File attemptDir = new File(taskDir, attemptUUID);
|
||||||
|
|
||||||
final ProcessHolder processHolder;
|
final ProcessHolder processHolder;
|
||||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexing.overlord;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import io.druid.guice.annotations.Self;
|
import io.druid.guice.annotations.Self;
|
||||||
|
import io.druid.indexing.common.config.TaskConfig;
|
||||||
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
|
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
|
||||||
import io.druid.indexing.worker.config.WorkerConfig;
|
import io.druid.indexing.worker.config.WorkerConfig;
|
||||||
import io.druid.server.DruidNode;
|
import io.druid.server.DruidNode;
|
||||||
|
@ -34,6 +35,7 @@ import java.util.Properties;
|
||||||
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
||||||
{
|
{
|
||||||
private final ForkingTaskRunnerConfig config;
|
private final ForkingTaskRunnerConfig config;
|
||||||
|
private final TaskConfig taskConfig;
|
||||||
private final WorkerConfig workerConfig;
|
private final WorkerConfig workerConfig;
|
||||||
private final Properties props;
|
private final Properties props;
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
|
@ -43,6 +45,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
||||||
@Inject
|
@Inject
|
||||||
public ForkingTaskRunnerFactory(
|
public ForkingTaskRunnerFactory(
|
||||||
final ForkingTaskRunnerConfig config,
|
final ForkingTaskRunnerConfig config,
|
||||||
|
final TaskConfig taskConfig,
|
||||||
final WorkerConfig workerConfig,
|
final WorkerConfig workerConfig,
|
||||||
final Properties props,
|
final Properties props,
|
||||||
final ObjectMapper jsonMapper,
|
final ObjectMapper jsonMapper,
|
||||||
|
@ -50,6 +53,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
||||||
@Self DruidNode node
|
@Self DruidNode node
|
||||||
) {
|
) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.taskConfig = taskConfig;
|
||||||
this.workerConfig = workerConfig;
|
this.workerConfig = workerConfig;
|
||||||
this.props = props;
|
this.props = props;
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
|
@ -60,6 +64,6 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
|
||||||
@Override
|
@Override
|
||||||
public TaskRunner build()
|
public TaskRunner build()
|
||||||
{
|
{
|
||||||
return new ForkingTaskRunner(config, workerConfig, props, persistentTaskLogs, jsonMapper, node);
|
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,6 @@ import java.util.List;
|
||||||
|
|
||||||
public class ForkingTaskRunnerConfig
|
public class ForkingTaskRunnerConfig
|
||||||
{
|
{
|
||||||
@JsonProperty
|
|
||||||
@NotNull
|
|
||||||
private String taskDir = "/tmp/persistent";
|
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@NotNull
|
@NotNull
|
||||||
private String javaCommand = "java";
|
private String javaCommand = "java";
|
||||||
|
@ -66,11 +62,6 @@ public class ForkingTaskRunnerConfig
|
||||||
"java.io.tmpdir"
|
"java.io.tmpdir"
|
||||||
);
|
);
|
||||||
|
|
||||||
public String getTaskDir()
|
|
||||||
{
|
|
||||||
return taskDir;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getJavaCommand()
|
public String getJavaCommand()
|
||||||
{
|
{
|
||||||
return javaCommand;
|
return javaCommand;
|
||||||
|
|
Loading…
Reference in New Issue