TaskToolbox: Replace IndexerCoordinatorConfig with TaskConfig

This commit is contained in:
Gian Merlino 2013-01-31 08:13:44 -08:00
parent fe38ed2547
commit 779c54d8f2
6 changed files with 43 additions and 69 deletions

View File

@ -25,6 +25,7 @@ import com.metamx.druid.loading.S3SegmentGetterConfig;
import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.loading.SegmentPusher;
@ -40,7 +41,7 @@ import java.util.Map;
*/
public class TaskToolbox
{
private final IndexerCoordinatorConfig config;
private final TaskConfig config;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final SegmentPusher segmentPusher;
@ -48,7 +49,7 @@ public class TaskToolbox
private final ObjectMapper objectMapper;
public TaskToolbox(
IndexerCoordinatorConfig config,
TaskConfig config,
ServiceEmitter emitter,
RestS3Service s3Client,
SegmentPusher segmentPusher,
@ -64,7 +65,7 @@ public class TaskToolbox
this.objectMapper = objectMapper;
}
public IndexerCoordinatorConfig getConfig()
public TaskConfig getConfig()
{
return config;
}

View File

@ -0,0 +1,21 @@
package com.metamx.druid.merger.common.config;
import com.metamx.druid.merger.common.task.Task;
import org.skife.config.Config;
import org.skife.config.Default;
import java.io.File;
public abstract class TaskConfig
{
@Config("druid.merger.taskDir")
public abstract File getBaseTaskDir();
@Config("druid.merger.rowFlushBoundary")
@Default("500000")
public abstract long getRowFlushBoundary();
public File getTaskDir(final Task task) {
return new File(getBaseTaskDir(), task.getId());
}
}

View File

@ -52,9 +52,6 @@ public abstract class IndexerCoordinatorConfig
@Default("local")
public abstract String getStorageImpl();
@Config("druid.merger.taskDir")
public abstract File getBaseTaskDir();
@Config("druid.merger.whitelist.enabled")
@Default("false")
public abstract boolean isWhitelistEnabled();
@ -63,10 +60,6 @@ public abstract class IndexerCoordinatorConfig
@Default("")
public abstract String getWhitelistDatasourcesString();
public File getTaskDir(final Task task) {
return new File(getBaseTaskDir(), task.getId());
}
public Set<String> getWhitelistDatasources()
{
if(whitelistDatasources == null) {
@ -80,10 +73,6 @@ public abstract class IndexerCoordinatorConfig
return whitelistDatasources;
}
@Config("druid.merger.rowFlushBoundary")
@Default("500000")
public abstract long getRowFlushBoundary();
@Config("druid.indexer.strategy")
@Default("noop")
public abstract String getStrategyImpl();

View File

@ -54,6 +54,7 @@ import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.DbTaskStorage;
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
@ -134,6 +135,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
private DbConnectorConfig dbConnectorConfig = null;
private DBI dbi = null;
private IndexerCoordinatorConfig config = null;
private TaskConfig taskConfig = null;
private TaskToolbox taskToolbox = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private TaskStorage taskStorage = null;
@ -213,6 +215,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeMonitors();
initializeDB();
initializeIndexerCoordinatorConfig();
initializeTaskConfig();
initializeMergeDBCoordinator();
initializeTaskToolbox();
initializeTaskStorage();
@ -409,6 +412,13 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
private void initializeTaskConfig()
{
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
}
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolbox == null) {
@ -426,7 +436,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client
);
taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
taskToolbox = new TaskToolbox(taskConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
}
}

View File

@ -41,6 +41,7 @@ import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.worker.TaskMonitor;
@ -98,7 +99,7 @@ public class WorkerNode extends RegisteringNode
private List<Monitor> monitors = null;
private ServiceEmitter emitter = null;
private IndexerCoordinatorConfig coordinatorConfig = null; // TODO needed for task toolbox, but shouldn't be
private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null;
private TaskToolbox taskToolbox = null;
private CuratorFramework curatorFramework = null;
@ -272,8 +273,8 @@ public class WorkerNode extends RegisteringNode
private void initializeMergerConfig()
{
if (coordinatorConfig == null) {
coordinatorConfig = configFactory.build(IndexerCoordinatorConfig.class);
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
if (workerConfig == null) {
@ -298,7 +299,7 @@ public class WorkerNode extends RegisteringNode
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client
);
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
taskToolbox = new TaskToolbox(taskConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
}
}

View File

@ -11,6 +11,7 @@ import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.DefaultMergeTask;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
@ -277,38 +278,8 @@ public class RemoteTaskRunnerTest
cf,
workerCuratorCoordinator,
new TaskToolbox(
new IndexerCoordinatorConfig()
new TaskConfig()
{
@Override
public String getServerName()
{
return "worker1";
}
@Override
public String getLeaderLatchPath()
{
return null;
}
@Override
public int getNumLocalThreads()
{
return 1;
}
@Override
public String getRunnerImpl()
{
return null;
}
@Override
public String getStorageImpl()
{
return null;
}
@Override
public File getBaseTaskDir()
{
@ -320,30 +291,11 @@ public class RemoteTaskRunnerTest
}
}
@Override
public boolean isWhitelistEnabled()
{
return false;
}
@Override
public String getWhitelistDatasourcesString()
{
return null;
}
@Override
public long getRowFlushBoundary()
{
return 0;
}
@Override
public String getStrategyImpl()
{
return null;
}
}, null, null, null, null, jsonMapper
),
Executors.newSingleThreadExecutor()