Move indexer zk paths to ZkPathsConfig

This commit is contained in:
Gian Merlino 2013-05-03 16:38:23 +03:00
parent af08ea7617
commit b32a728863
8 changed files with 61 additions and 33 deletions

View File

@ -19,6 +19,7 @@
package com.metamx.druid.initialization;
import org.apache.curator.utils.ZKPaths;
import org.skife.config.Config;
public abstract class ZkPathsConfig
@ -56,7 +57,32 @@ public abstract class ZkPathsConfig
return defaultPath("master");
}
private String defaultPath(final String subPath) {
return String.format("%s/%s", getZkBasePath(), subPath);
@Config("druid.zk.paths.indexer.announcementsPath")
public String getIndexerAnnouncementPath()
{
return defaultPath("indexer/announcements");
}
@Config("druid.zk.paths.indexer.tasksPath")
public String getIndexerTaskPath()
{
return defaultPath("indexer/tasks");
}
@Config("druid.zk.paths.indexer.statusPath")
public String getIndexerStatusPath()
{
return defaultPath("indexer/status");
}
@Config("druid.zk.paths.indexer.leaderLatchPath")
public String getIndexerLeaderLatchPath()
{
return defaultPath("indexer/leaderLatchPath");
}
private String defaultPath(final String subPath)
{
return ZKPaths.makePath(getZkBasePath(), subPath);
}
}

View File

@ -19,22 +19,14 @@
package com.metamx.druid.merger.common.config;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class IndexerZkConfig
public abstract class IndexerZkConfig extends ZkPathsConfig
{
@Config("druid.zk.paths.indexer.announcementsPath")
public abstract String getAnnouncementPath();
@Config("druid.zk.paths.indexer.tasksPath")
public abstract String getTaskPath();
@Config("druid.zk.paths.indexer.statusPath")
public abstract String getStatusPath();
@Config("druid.zk.maxNumBytes")
@Default("512000")
public abstract long getMaxNumBytes();

View File

@ -430,7 +430,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private void cleanup(final String workerId, final String taskId)
{
runningTasks.remove(taskId);
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
.withMode(CreateMode.EPHEMERAL)
.forPath(
JOINER.join(
config.getTaskPath(),
config.getIndexerTaskPath(),
theWorker.getHost(),
task.getId()
),
@ -522,7 +522,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private void addWorker(final Worker worker)
{
try {
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final ZkWorker zkWorker = new ZkWorker(
worker,
@ -626,18 +626,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
try {
Set<String> tasksToRetry = Sets.newHashSet(
cf.getChildren()
.forPath(JOINER.join(config.getTaskPath(), worker.getHost()))
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getStatusPath(), worker.getHost()))
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), taskId);
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}

View File

@ -77,7 +77,7 @@ public class TaskMasterLifecycle
this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector(
curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener()
curator, indexerCoordinatorConfig.getIndexerLeaderLatchPath(), new LeaderSelectorListener()
{
@Override
public void takeLeadership(CuratorFramework client) throws Exception

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.config;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
@ -29,16 +30,13 @@ import java.util.Set;
/**
*/
public abstract class IndexerCoordinatorConfig
public abstract class IndexerCoordinatorConfig extends ZkPathsConfig
{
private volatile Set<String> whitelistDatasources = null;
@Config("druid.host")
public abstract String getServerName();
@Config("druid.zk.paths.indexer.leaderLatchPath")
public abstract String getLeaderLatchPath();
@Config("druid.merger.threads")
@Default("1")
public abstract int getNumLocalThreads();

View File

@ -646,7 +646,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
getJsonMapper(),
getConfigFactory().build(RemoteTaskRunnerConfig.class),
curator,
new PathChildrenCache(curator, indexerZkConfig.getAnnouncementPath(), true),
new PathChildrenCache(curator, indexerZkConfig.getIndexerAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(
getConfigFactory().buildWithReplacements(

View File

@ -71,9 +71,9 @@ public class WorkerCuratorCoordinator
this.worker = worker;
this.config = config;
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getAnnouncementPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(config.getTaskPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(config.getStatusPath(), worker.getHost()));
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getIndexerAnnouncementPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(config.getIndexerTaskPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(config.getIndexerStatusPath(), worker.getHost()));
}
@LifecycleStart

View File

@ -252,23 +252,29 @@ public class RemoteTaskRunnerTest
new IndexerZkConfig()
{
@Override
public String getAnnouncementPath()
public String getIndexerAnnouncementPath()
{
return announcementsPath;
}
@Override
public String getTaskPath()
public String getIndexerTaskPath()
{
return tasksPath;
}
@Override
public String getStatusPath()
public String getIndexerStatusPath()
{
return statusPath;
}
@Override
public String getZkBasePath()
{
throw new UnsupportedOperationException();
}
@Override
public long getMaxNumBytes()
{
@ -375,23 +381,29 @@ public class RemoteTaskRunnerTest
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public String getAnnouncementPath()
public String getIndexerAnnouncementPath()
{
return announcementsPath;
}
@Override
public String getTaskPath()
public String getIndexerTaskPath()
{
return tasksPath;
}
@Override
public String getStatusPath()
public String getIndexerStatusPath()
{
return statusPath;
}
@Override
public String getZkBasePath()
{
throw new UnsupportedOperationException();
}
@Override
public Duration getTaskAssignmentTimeoutDuration()
{