diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java index 7f7fa19b43e..2f04e61b309 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java @@ -19,6 +19,7 @@ package com.metamx.druid.initialization; +import org.apache.curator.utils.ZKPaths; import org.skife.config.Config; public abstract class ZkPathsConfig @@ -56,7 +57,32 @@ public abstract class ZkPathsConfig return defaultPath("master"); } - private String defaultPath(final String subPath) { - return String.format("%s/%s", getZkBasePath(), subPath); + @Config("druid.zk.paths.indexer.announcementsPath") + public String getIndexerAnnouncementPath() + { + return defaultPath("indexer/announcements"); + } + + @Config("druid.zk.paths.indexer.tasksPath") + public String getIndexerTaskPath() + { + return defaultPath("indexer/tasks"); + } + + @Config("druid.zk.paths.indexer.statusPath") + public String getIndexerStatusPath() + { + return defaultPath("indexer/status"); + } + + @Config("druid.zk.paths.indexer.leaderLatchPath") + public String getIndexerLeaderLatchPath() + { + return defaultPath("indexer/leaderLatchPath"); + } + + private String defaultPath(final String subPath) + { + return ZKPaths.makePath(getZkBasePath(), subPath); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java b/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java index 92cc8393f15..d85f53e4350 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java @@ -19,22 +19,14 @@ package com.metamx.druid.merger.common.config; +import com.metamx.druid.initialization.ZkPathsConfig; import org.skife.config.Config; import org.skife.config.Default; /** */ -public abstract class IndexerZkConfig +public abstract class IndexerZkConfig extends ZkPathsConfig { - @Config("druid.zk.paths.indexer.announcementsPath") - public abstract String getAnnouncementPath(); - - @Config("druid.zk.paths.indexer.tasksPath") - public abstract String getTaskPath(); - - @Config("druid.zk.paths.indexer.statusPath") - public abstract String getStatusPath(); - @Config("druid.zk.maxNumBytes") @Default("512000") public abstract long getMaxNumBytes(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 9388d3ff0fa..f8d71836624 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -430,7 +430,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider private void cleanup(final String workerId, final String taskId) { runningTasks.remove(taskId); - final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId); + final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId); try { cf.delete().guaranteed().forPath(statusPath); } @@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider .withMode(CreateMode.EPHEMERAL) .forPath( JOINER.join( - config.getTaskPath(), + config.getIndexerTaskPath(), theWorker.getHost(), task.getId() ), @@ -522,7 +522,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider private void addWorker(final Worker worker) { try { - final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); + final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost()); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final ZkWorker zkWorker = new ZkWorker( worker, @@ -626,18 +626,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider try { Set tasksToRetry = Sets.newHashSet( cf.getChildren() - .forPath(JOINER.join(config.getTaskPath(), worker.getHost())) + .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) ); tasksToRetry.addAll( cf.getChildren() - .forPath(JOINER.join(config.getStatusPath(), worker.getHost())) + .forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost())) ); log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size()); for (String taskId : tasksToRetry) { TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); if (taskRunnerWorkItem != null) { - String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), taskId); + String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId); if (cf.checkExists().forPath(taskPath) != null) { cf.delete().guaranteed().forPath(taskPath); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java index 68ded354100..c00df6bf252 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java @@ -77,7 +77,7 @@ public class TaskMasterLifecycle this.taskActionClientFactory = taskActionClientFactory; this.leaderSelector = new LeaderSelector( - curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener() + curator, indexerCoordinatorConfig.getIndexerLeaderLatchPath(), new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java index bdf1b8a480f..88d00b42500 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/IndexerCoordinatorConfig.java @@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.config; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableSet; +import com.metamx.druid.initialization.ZkPathsConfig; import org.skife.config.Config; import org.skife.config.Default; import org.skife.config.DefaultNull; @@ -29,16 +30,13 @@ import java.util.Set; /** */ -public abstract class IndexerCoordinatorConfig +public abstract class IndexerCoordinatorConfig extends ZkPathsConfig { private volatile Set whitelistDatasources = null; @Config("druid.host") public abstract String getServerName(); - @Config("druid.zk.paths.indexer.leaderLatchPath") - public abstract String getLeaderLatchPath(); - @Config("druid.merger.threads") @Default("1") public abstract int getNumLocalThreads(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 931f8779cf4..8a854499cfa 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -646,7 +646,7 @@ public class IndexerCoordinatorNode extends QueryableNode