fix worker status path announcement with indexer zk config

This commit is contained in:
fjy 2015-01-30 12:26:08 -08:00
parent b78ef22829
commit bc1405bee0
5 changed files with 109 additions and 17 deletions

View File

@ -52,7 +52,6 @@ import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.tasklogs.TaskLogStreamer;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
@ -497,7 +496,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} else {
final String workerId = worker.getHost();
log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId);
final String statusPath = JOINER.join(indexerZkConfig.getStatus(), workerId, taskId);
final String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
@ -643,7 +642,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Worker[%s] reportin' for duty!", worker.getHost());
try {
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatus(), worker.getHost());
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
final ZkWorker zkWorker = new ZkWorker(

View File

@ -78,7 +78,7 @@ public class WorkerCuratorCoordinator
this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(indexerZkConfig.getStatus(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(indexerZkConfig.getStatusPath(), worker.getHost()));
}
@LifecycleStart

View File

@ -35,7 +35,7 @@ public class IndexerZkConfig
@JsonProperty("base") String base,
@JsonProperty("announcementsPath") String announcementsPath,
@JsonProperty("tasksPath") String tasksPath,
@JsonProperty("status") String status,
@JsonProperty("statusPath") String statusPath,
@JsonProperty("leaderLatchPath") String leaderLatchPath
)
{
@ -43,7 +43,7 @@ public class IndexerZkConfig
this.base = base;
this.announcementsPath = announcementsPath;
this.tasksPath = tasksPath;
this.status = status;
this.statusPath = statusPath;
this.leaderLatchPath = leaderLatchPath;
}
@ -60,7 +60,7 @@ public class IndexerZkConfig
private final String tasksPath;
@JsonProperty
private final String status;
private final String statusPath;
@JsonProperty
private final String leaderLatchPath;
@ -85,9 +85,9 @@ public class IndexerZkConfig
return tasksPath == null ? defaultIndexerPath("tasks") : tasksPath;
}
public String getStatus()
public String getStatusPath()
{
return status == null ? defaultIndexerPath("status") : status;
return statusPath == null ? defaultIndexerPath("status") : statusPath;
}
public String getLeaderLatchPath()
@ -99,4 +99,52 @@ public class IndexerZkConfig
{
return zkPathsConfig;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IndexerZkConfig that = (IndexerZkConfig) o;
if (announcementsPath != null
? !announcementsPath.equals(that.announcementsPath)
: that.announcementsPath != null) {
return false;
}
if (base != null ? !base.equals(that.base) : that.base != null) {
return false;
}
if (leaderLatchPath != null ? !leaderLatchPath.equals(that.leaderLatchPath) : that.leaderLatchPath != null) {
return false;
}
if (statusPath != null ? !statusPath.equals(that.statusPath) : that.statusPath != null) {
return false;
}
if (tasksPath != null ? !tasksPath.equals(that.tasksPath) : that.tasksPath != null) {
return false;
}
if (zkPathsConfig != null ? !zkPathsConfig.equals(that.zkPathsConfig) : that.zkPathsConfig != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = zkPathsConfig != null ? zkPathsConfig.hashCode() : 0;
result = 31 * result + (base != null ? base.hashCode() : 0);
result = 31 * result + (announcementsPath != null ? announcementsPath.hashCode() : 0);
result = 31 * result + (tasksPath != null ? tasksPath.hashCode() : 0);
result = 31 * result + (statusPath != null ? statusPath.hashCode() : 0);
result = 31 * result + (leaderLatchPath != null ? leaderLatchPath.hashCode() : 0);
return result;
}
}

View File

@ -20,29 +20,26 @@
package io.druid.server.initialization;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.curator.CuratorConfig;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.JsonConfigurator;
import io.druid.initialization.Initialization;
import org.junit.AfterClass;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
@ -138,7 +135,8 @@ public class IndexerZkConfigTest
}
@Test
public void testNullConfig(){
public void testNullConfig()
{
propertyValues.clear();
final Injector injector = Initialization.makeInjectorWithModules(
@ -187,7 +185,8 @@ public class IndexerZkConfigTest
}
@Test
public void testExactConfig(){
public void testExactConfig()
{
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(simpleZkConfigModule)
@ -206,9 +205,41 @@ public class IndexerZkConfigTest
ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get().get();
IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1,null,null,null,null,null);
IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1, null, null, null, null, null);
Assert.assertEquals("indexer", indexerZkConfig.getBase());
Assert.assertEquals("/druid/metrics/indexer/announcements", indexerZkConfig.getAnnouncementsPath());
}
@Test
public void testFullOverride() throws Exception
{
final DefaultObjectMapper mapper = new DefaultObjectMapper();
final ZkPathsConfig zkPathsConfig = new ZkPathsConfig();
IndexerZkConfig indexerZkConfig = new IndexerZkConfig(
zkPathsConfig,
"/druid/prod",
"/druid/prod/a",
"/druid/prod/t",
"/druid/prod/s",
"/druid/prod/l"
);
Map<String, String> value = mapper.readValue(
mapper.writeValueAsString(indexerZkConfig), new TypeReference<Map<String, String>>()
{
}
);
IndexerZkConfig newConfig = new IndexerZkConfig(
zkPathsConfig,
value.get("base"),
value.get("announcementsPath"),
value.get("tasksPath"),
value.get("statusPath"),
value.get("leaderLatchPath")
);
Assert.assertEquals(indexerZkConfig, newConfig);
}
}

View File

@ -120,4 +120,18 @@ public class ZkPathsConfig
}
return false;
}
@Override
public int hashCode()
{
int result = base != null ? base.hashCode() : 0;
result = 31 * result + (propertiesPath != null ? propertiesPath.hashCode() : 0);
result = 31 * result + (announcementsPath != null ? announcementsPath.hashCode() : 0);
result = 31 * result + (servedSegmentsPath != null ? servedSegmentsPath.hashCode() : 0);
result = 31 * result + (liveSegmentsPath != null ? liveSegmentsPath.hashCode() : 0);
result = 31 * result + (coordinatorPath != null ? coordinatorPath.hashCode() : 0);
result = 31 * result + (loadQueuePath != null ? loadQueuePath.hashCode() : 0);
result = 31 * result + (connectorPath != null ? connectorPath.hashCode() : 0);
return result;
}
}