diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 26336b45c25..e01dcb0c020 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -25,6 +25,7 @@ import com.metamx.druid.loading.S3SegmentGetterConfig; import com.metamx.druid.loading.S3ZippedSegmentPuller; import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.loading.SegmentPuller; +import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.loading.SegmentPusher; @@ -40,7 +41,7 @@ import java.util.Map; */ public class TaskToolbox { - private final IndexerCoordinatorConfig config; + private final TaskConfig config; private final ServiceEmitter emitter; private final RestS3Service s3Client; private final SegmentPusher segmentPusher; @@ -48,7 +49,7 @@ public class TaskToolbox private final ObjectMapper objectMapper; public TaskToolbox( - IndexerCoordinatorConfig config, + TaskConfig config, ServiceEmitter emitter, RestS3Service s3Client, SegmentPusher segmentPusher, @@ -64,7 +65,7 @@ public class TaskToolbox this.objectMapper = objectMapper; } - public IndexerCoordinatorConfig getConfig() + public TaskConfig getConfig() { return config; } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java new file mode 100644 index 00000000000..c66009cd8ac --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/config/TaskConfig.java @@ -0,0 +1,21 @@ +package com.metamx.druid.merger.common.config; + +import com.metamx.druid.merger.common.task.Task; +import org.skife.config.Config; +import org.skife.config.Default; + +import java.io.File; + +public abstract class TaskConfig +{ + @Config("druid.merger.taskDir") + public abstract File getBaseTaskDir(); + + @Config("druid.merger.rowFlushBoundary") + @Default("500000") + public abstract long getRowFlushBoundary(); + + public File getTaskDir(final Task task) { + return new File(getBaseTaskDir(), task.getId()); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java index 6d6218c3bff..b727f805bb2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java @@ -52,9 +52,6 @@ public abstract class IndexerCoordinatorConfig @Default("local") public abstract String getStorageImpl(); - @Config("druid.merger.taskDir") - public abstract File getBaseTaskDir(); - @Config("druid.merger.whitelist.enabled") @Default("false") public abstract boolean isWhitelistEnabled(); @@ -63,10 +60,6 @@ public abstract class IndexerCoordinatorConfig @Default("") public abstract String getWhitelistDatasourcesString(); - public File getTaskDir(final Task task) { - return new File(getBaseTaskDir(), task.getId()); - } - public Set getWhitelistDatasources() { if(whitelistDatasources == null) { @@ -80,10 +73,6 @@ public abstract class IndexerCoordinatorConfig return whitelistDatasources; } - @Config("druid.merger.rowFlushBoundary") - @Default("500000") - public abstract long getRowFlushBoundary(); - @Config("druid.indexer.strategy") @Default("noop") public abstract String getStrategyImpl(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 3d03d7e10b8..3d084d60712 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -54,6 +54,7 @@ import com.metamx.druid.loading.S3SegmentPusherConfig; import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; +import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.DbTaskStorage; import com.metamx.druid.merger.coordinator.LocalTaskRunner; @@ -134,6 +135,7 @@ public class IndexerCoordinatorNode extends RegisteringNode private DbConnectorConfig dbConnectorConfig = null; private DBI dbi = null; private IndexerCoordinatorConfig config = null; + private TaskConfig taskConfig = null; private TaskToolbox taskToolbox = null; private MergerDBCoordinator mergerDBCoordinator = null; private TaskStorage taskStorage = null; @@ -213,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeMonitors(); initializeDB(); initializeIndexerCoordinatorConfig(); + initializeTaskConfig(); initializeMergeDBCoordinator(); initializeTaskToolbox(); initializeTaskStorage(); @@ -409,6 +412,13 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + private void initializeTaskConfig() + { + if (taskConfig == null) { + taskConfig = configFactory.build(TaskConfig.class); + } + } + public void initializeTaskToolbox() throws S3ServiceException { if (taskToolbox == null) { @@ -426,7 +436,7 @@ public class IndexerCoordinatorNode extends RegisteringNode final SegmentKiller segmentKiller = new S3SegmentKiller( s3Client ); - taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); + taskToolbox = new TaskToolbox(taskConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 2ee0a1ecaff..14c8e73ac1f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -41,6 +41,7 @@ import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; +import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.worker.TaskMonitor; @@ -98,7 +99,7 @@ public class WorkerNode extends RegisteringNode private List monitors = null; private ServiceEmitter emitter = null; - private IndexerCoordinatorConfig coordinatorConfig = null; // TODO needed for task toolbox, but shouldn't be + private TaskConfig taskConfig = null; private WorkerConfig workerConfig = null; private TaskToolbox taskToolbox = null; private CuratorFramework curatorFramework = null; @@ -272,8 +273,8 @@ public class WorkerNode extends RegisteringNode private void initializeMergerConfig() { - if (coordinatorConfig == null) { - coordinatorConfig = configFactory.build(IndexerCoordinatorConfig.class); + if (taskConfig == null) { + taskConfig = configFactory.build(TaskConfig.class); } if (workerConfig == null) { @@ -298,7 +299,7 @@ public class WorkerNode extends RegisteringNode final SegmentKiller segmentKiller = new S3SegmentKiller( s3Client ); - taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); + taskToolbox = new TaskToolbox(taskConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index b8620eb42df..ec53b0257b6 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -11,6 +11,7 @@ import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; +import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.DefaultMergeTask; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; @@ -277,38 +278,8 @@ public class RemoteTaskRunnerTest cf, workerCuratorCoordinator, new TaskToolbox( - new IndexerCoordinatorConfig() + new TaskConfig() { - @Override - public String getServerName() - { - return "worker1"; - } - - @Override - public String getLeaderLatchPath() - { - return null; - } - - @Override - public int getNumLocalThreads() - { - return 1; - } - - @Override - public String getRunnerImpl() - { - return null; - } - - @Override - public String getStorageImpl() - { - return null; - } - @Override public File getBaseTaskDir() { @@ -320,30 +291,11 @@ public class RemoteTaskRunnerTest } } - @Override - public boolean isWhitelistEnabled() - { - return false; - } - - @Override - public String getWhitelistDatasourcesString() - { - return null; - } - @Override public long getRowFlushBoundary() { return 0; } - - - @Override - public String getStrategyImpl() - { - return null; - } }, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor()