diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceModuleHelper.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceModuleHelper.java index 72a983d6884..b4f57f3b0ea 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceModuleHelper.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceModuleHelper.java @@ -22,6 +22,7 @@ package io.druid.guice; import com.google.inject.Binder; import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.server.initialization.IndexerZkConfig; /** */ @@ -31,5 +32,6 @@ public class IndexingServiceModuleHelper { JsonConfigProvider.bind(binder, "druid.indexer.runner", ForkingTaskRunnerConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.runner", RemoteTaskRunnerConfig.class); + JsonConfigProvider.bind(binder, "druid.zk.paths.indexer", IndexerZkConfig.class); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 355f018ea89..bc70d153c1b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -51,6 +51,7 @@ import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.WorkerSelectStrategy; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; +import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.tasklogs.TaskLogStreamer; import org.apache.commons.lang.mutable.MutableInt; @@ -102,7 +103,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; - private final ZkPathsConfig zkPaths; + private final IndexerZkConfig indexerZkConfig; private final CuratorFramework cf; private final PathChildrenCacheFactory pathChildrenCacheFactory; private final PathChildrenCache workerPathCache; @@ -129,7 +130,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer public RemoteTaskRunner( ObjectMapper jsonMapper, RemoteTaskRunnerConfig config, - ZkPathsConfig zkPaths, + IndexerZkConfig indexerZkConfig, CuratorFramework cf, PathChildrenCacheFactory pathChildrenCacheFactory, HttpClient httpClient, @@ -138,10 +139,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer { this.jsonMapper = jsonMapper; this.config = config; - this.zkPaths = zkPaths; + this.indexerZkConfig = indexerZkConfig; this.cf = cf; this.pathChildrenCacheFactory = pathChildrenCacheFactory; - this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath()); + this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath()); this.httpClient = httpClient; this.strategy = strategy; } @@ -496,7 +497,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer } else { final String workerId = worker.getHost(); log.info("Cleaning up task[%s] on worker[%s]", taskId, workerId); - final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId); + final String statusPath = JOINER.join(indexerZkConfig.getStatus(), workerId, taskId); try { cf.delete().guaranteed().forPath(statusPath); } @@ -582,7 +583,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); } - String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), theWorker.getHost(), task.getId()); + String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), theWorker.getHost(), task.getId()); if (cf.checkExists().forPath(taskPath) == null) { cf.create() @@ -642,7 +643,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.info("Worker[%s] reportin' for duty!", worker.getHost()); try { - final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost()); + final String workerStatusPath = JOINER.join(indexerZkConfig.getStatus(), worker.getHost()); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); final SettableFuture retVal = SettableFuture.create(); final ZkWorker zkWorker = new ZkWorker( @@ -787,7 +788,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer if (zkWorker != null) { try { List tasksToFail = Lists.newArrayList( - cf.getChildren().forPath(JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost())) + cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost())) ); log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size()); @@ -805,7 +806,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer for (String assignedTask : tasksToFail) { RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask); if (taskRunnerWorkItem != null) { - String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost(), assignedTask); + String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost(), assignedTask); if (cf.checkExists().forPath(taskPath) != null) { cf.delete().guaranteed().forPath(taskPath); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index e68aa15b431..5c0e1cca09f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -29,6 +29,7 @@ import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; import io.druid.indexing.overlord.setup.WorkerSelectStrategy; +import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; @@ -38,7 +39,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory { private final CuratorFramework curator; private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; - private final ZkPathsConfig zkPaths; + private final IndexerZkConfig zkPaths; private final ObjectMapper jsonMapper; private final HttpClient httpClient; private final WorkerSelectStrategy strategy; @@ -47,7 +48,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory public RemoteTaskRunnerFactory( final CuratorFramework curator, final RemoteTaskRunnerConfig remoteTaskRunnerConfig, - final ZkPathsConfig zkPaths, + final IndexerZkConfig zkPaths, final ObjectMapper jsonMapper, @Global final HttpClient httpClient, final Supplier workerBehaviourConfigSupplier diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 61a7b595437..5b4c79a9327 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -38,6 +38,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler; import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerFactory; import io.druid.server.DruidNode; +import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; @@ -74,7 +75,7 @@ public class TaskMaster final TaskStorage taskStorage, final TaskActionClientFactory taskActionClientFactory, @Self final DruidNode node, - final ZkPathsConfig zkPaths, + final IndexerZkConfig zkPaths, final TaskRunnerFactory runnerFactory, final ResourceManagementSchedulerFactory managementSchedulerFactory, final CuratorFramework curator, @@ -85,7 +86,7 @@ public class TaskMaster this.taskActionClientFactory = taskActionClientFactory; this.leaderSelector = new LeaderSelector( curator, - zkPaths.getIndexerLeaderLatchPath(), + zkPaths.getLeaderLatchPath(), new LeaderSelectorListener() { @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java index 32dbf5b85ff..dc215545c7a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -31,7 +31,7 @@ import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import io.druid.curator.announcement.Announcer; import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; -import io.druid.server.initialization.ZkPathsConfig; +import io.druid.server.initialization.IndexerZkConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.joda.time.DateTime; @@ -63,7 +63,7 @@ public class WorkerCuratorCoordinator @Inject public WorkerCuratorCoordinator( ObjectMapper jsonMapper, - ZkPathsConfig zkPaths, + IndexerZkConfig indexerZkConfig, RemoteTaskRunnerConfig config, CuratorFramework curatorFramework, Worker worker @@ -76,9 +76,9 @@ public class WorkerCuratorCoordinator this.announcer = new Announcer(curatorFramework, MoreExecutors.sameThreadExecutor()); - this.baseAnnouncementsPath = getPath(Arrays.asList(zkPaths.getIndexerAnnouncementPath(), worker.getHost())); - this.baseTaskPath = getPath(Arrays.asList(zkPaths.getIndexerTaskPath(), worker.getHost())); - this.baseStatusPath = getPath(Arrays.asList(zkPaths.getIndexerStatusPath(), worker.getHost())); + this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost())); + this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost())); + this.baseStatusPath = getPath(Arrays.asList(indexerZkConfig.getStatus(), worker.getHost())); } @LifecycleStart diff --git a/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java b/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java new file mode 100644 index 00000000000..6333ea3f0e9 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java @@ -0,0 +1,87 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.initialization; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.inject.Inject; +import org.apache.curator.utils.ZKPaths; + +/** + * + */ +public class IndexerZkConfig +{ + @Inject + @JsonIgnore + private ZkPathsConfig zkPathsConfig = new ZkPathsConfig(); + @JsonProperty + private String base; + @JsonProperty + private String announcementsPath; + @JsonProperty + private String tasksPath; + @JsonProperty + private String status; + @JsonProperty + private String leaderLatchPath; + + private String defaultIndexerPath(final String subPath) + { + return getZkPathsConfig().defaultPath(ZKPaths.makePath(getBase(), subPath)); + } + + public String getBase() + { + return base == null ? "indexer" : base; + } + + public String getAnnouncementsPath() + { + return announcementsPath == null ? defaultIndexerPath("announcements") : announcementsPath; + } + + public String getTasksPath() + { + return tasksPath == null ? defaultIndexerPath("tasks") : tasksPath; + } + + public String getStatus() + { + return status == null ? defaultIndexerPath("status") : status; + } + + public String getLeaderLatchPath() + { + return leaderLatchPath == null ? defaultIndexerPath("leaderLatchPath") : leaderLatchPath; + } + + public ZkPathsConfig getZkPathsConfig() + { + return zkPathsConfig; + } + + // Setter required for easy debugging + public IndexerZkConfig setZkPathsConfig(ZkPathsConfig zkPathsConfig) + { + this.zkPathsConfig = zkPathsConfig; + return this; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index a007c9f4cf2..01bfbca8480 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -43,6 +43,7 @@ import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; import io.druid.indexing.worker.TaskAnnouncement; import io.druid.indexing.worker.Worker; import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -57,6 +58,7 @@ import org.junit.Test; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class RemoteTaskRunnerTest { @@ -66,6 +68,7 @@ public class RemoteTaskRunnerTest private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath); private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath); private static final String statusPath = String.format("%s/indexer/status/worker", basePath); + private static final int TIMEOUT_SECONDS = 5; private TestingCluster testingCluster; private CuratorFramework cf; @@ -282,7 +285,7 @@ public class RemoteTaskRunnerTest cf.delete().forPath(joiner.join(statusPath, task.getId())); - TaskStatus status = future.get(); + TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.FAILED); } @@ -335,7 +338,7 @@ public class RemoteTaskRunnerTest ListenableFuture future = remoteTaskRunner.run(task); - TaskStatus status = future.get(); + TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode()); } @@ -353,7 +356,7 @@ public class RemoteTaskRunnerTest cf.delete().forPath(announcementsPath); - TaskStatus status = future.get(); + TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode()); } @@ -393,14 +396,14 @@ public class RemoteTaskRunnerTest remoteTaskRunner = new RemoteTaskRunner( jsonMapper, config, - new ZkPathsConfig() + new IndexerZkConfig().setZkPathsConfig(new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return basePath; } - }, + }), cf, new SimplePathChildrenCacheFactory.Builder().build(), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 478a77494a3..76b269b855e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -50,6 +50,7 @@ import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import junit.framework.Assert; import org.apache.curator.framework.CuratorFramework; @@ -139,14 +140,15 @@ public class WorkerTaskMonitorTest workerCuratorCoordinator = new WorkerCuratorCoordinator( jsonMapper, + new IndexerZkConfig().setZkPathsConfig( new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return basePath; } - }, + }), new TestRemoteTaskRunnerConfig(), cf, worker diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index 2a0a1e4a3c3..2b786532e79 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -25,6 +25,7 @@ import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.WorkerCuratorCoordinator; import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import junit.framework.Assert; import org.apache.curator.framework.CuratorFramework; @@ -76,14 +77,14 @@ public class WorkerResourceTest curatorCoordinator = new WorkerCuratorCoordinator( jsonMapper, - new ZkPathsConfig() + new IndexerZkConfig().setZkPathsConfig(new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return basePath; } - }, + }), new RemoteTaskRunnerConfig(), cf, worker diff --git a/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java b/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java new file mode 100644 index 00000000000..422fc05af83 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java @@ -0,0 +1,175 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.initialization; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import io.druid.curator.CuratorConfig; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.JsonConfigurator; +import io.druid.initialization.Initialization; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * + */ +public class IndexerZkConfigTest +{ + private static final String indexerPropertyString = "test.druid.zk.paths.indexer"; + private static final String zkServiceConfigString = "test.druid.zk.paths"; + private static final Collection clobberableProperties = new ArrayList<>(); + + private static final Module simpleZkConfigModule = new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + // See IndexingServiceModuleHelper + JsonConfigProvider.bind(binder, indexerPropertyString, IndexerZkConfig.class); + JsonConfigProvider.bind( + binder, zkServiceConfigString, + CuratorConfig.class + ); + } + }; + + private static final Map priorValues = new HashMap<>(); + + @BeforeClass + public static void setup() + { + for (Field field : IndexerZkConfig.class.getDeclaredFields()) { + if (null != field.getAnnotation(JsonProperty.class)) { + clobberableProperties.add(String.format("%s.%s", indexerPropertyString, field.getName())); + } + } + for (Field field : ZkPathsConfig.class.getDeclaredFields()) { + if (null != field.getAnnotation(JsonProperty.class)) { + clobberableProperties.add(String.format("%s.%s", zkServiceConfigString, field.getName())); + } + } + for (String clobberableProperty : clobberableProperties) { + priorValues.put(clobberableProperty, System.getProperty(clobberableProperty)); + } + } + + @AfterClass + public static void cleanup() + { + for (Map.Entry entry : priorValues.entrySet()) { + if (null != entry.getKey() && null != entry.getValue()) { + System.setProperty(entry.getKey(), entry.getValue()); + } + } + } + + private Map propertyValues = new HashMap<>(); + private int assertions = 0; + + @Before + public void setupTest() + { + for (String property : clobberableProperties) { + propertyValues.put(property, UUID.randomUUID().toString()); + } + System.getProperties().putAll(propertyValues); + assertions = 0; + } + + + private void validateEntries(ZkPathsConfig zkPathsConfig) + throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + for (Field field : ZkPathsConfig.class.getDeclaredFields()) { + if (null != field.getAnnotation(JsonProperty.class)) { + String property = String.format("%s.%s", zkServiceConfigString, field.getName()); + String getter = String.format( + "get%s%s", + field.getName().substring(0, 1).toUpperCase(), + field.getName().substring(1) + ); + Method method = ZkPathsConfig.class.getDeclaredMethod(getter); + Assert.assertEquals(propertyValues.get(property), method.invoke(zkPathsConfig)); + ++assertions; + } + } + } + + private void validateEntries(IndexerZkConfig indexerZkConfig) + throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + for (Field field : IndexerZkConfig.class.getDeclaredFields()) { + if (null != field.getAnnotation(JsonProperty.class)) { + String property = String.format("%s.%s", indexerPropertyString, field.getName()); + String getter = String.format( + "get%s%s", + field.getName().substring(0, 1).toUpperCase(), + field.getName().substring(1) + ); + Method method = IndexerZkConfig.class.getDeclaredMethod(getter); + Assert.assertEquals(propertyValues.get(property), method.invoke(indexerZkConfig)); + ++assertions; + } + } + } + + @Test + public void testSimpleConfig() throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + final Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of(simpleZkConfigModule) + ); + JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + + JsonConfigProvider zkPathsConfig = JsonConfigProvider.of(zkServiceConfigString, ZkPathsConfig.class); + zkPathsConfig.inject(System.getProperties(), configurator); + + JsonConfigProvider indexerZkConfig = JsonConfigProvider.of( + indexerPropertyString, + IndexerZkConfig.class + ); + indexerZkConfig.inject(System.getProperties(), configurator); + + validateEntries(indexerZkConfig.get().get()); + validateEntries(zkPathsConfig.get().get()); + Assert.assertEquals(clobberableProperties.size(), assertions); + } +} diff --git a/pom.xml b/pom.xml index b82343b5ff1..ba3f5255f58 100644 --- a/pom.xml +++ b/pom.xml @@ -456,7 +456,7 @@ org.easymock easymock - 3.0 + 3.3 test diff --git a/server/src/main/java/io/druid/curator/CuratorConfig.java b/server/src/main/java/io/druid/curator/CuratorConfig.java index 3e8795bfd2c..94401ec7ed6 100644 --- a/server/src/main/java/io/druid/curator/CuratorConfig.java +++ b/server/src/main/java/io/druid/curator/CuratorConfig.java @@ -19,22 +19,50 @@ package io.druid.curator; -import org.skife.config.Config; -import org.skife.config.Default; +import com.fasterxml.jackson.annotation.JsonProperty; +import javax.validation.constraints.Min; /** */ -public abstract class CuratorConfig +public class CuratorConfig { - @Config("druid.zk.service.host") - @Default("localhost") - public abstract String getZkHosts(); + @JsonProperty("host") + private String zkHosts = "localhost"; - @Config("druid.zk.service.sessionTimeoutMs") - @Default("30000") - public abstract int getZkSessionTimeoutMs(); + @JsonProperty("sessionTimeoutMs") + @Min(0) + private int zkSessionTimeoutMs = 30000; - @Config("druid.curator.compress") - @Default("true") - public abstract boolean enableCompression(); + @JsonProperty("compress") + private boolean enableCompression = true; + + public String getZkHosts() + { + return zkHosts; + } + + public void setZkHosts(String zkHosts) + { + this.zkHosts = zkHosts; + } + + public Integer getZkSessionTimeoutMs() + { + return zkSessionTimeoutMs; + } + + public void setZkSessionTimeoutMs(Integer zkSessionTimeoutMs) + { + this.zkSessionTimeoutMs = zkSessionTimeoutMs; + } + + public Boolean getEnableCompression() + { + return enableCompression; + } + + public void setEnableCompression(Boolean enableCompression) + { + this.enableCompression = enableCompression; + } } diff --git a/server/src/main/java/io/druid/curator/CuratorModule.java b/server/src/main/java/io/druid/curator/CuratorModule.java index 94fba357dd2..7d18eda90d3 100644 --- a/server/src/main/java/io/druid/curator/CuratorModule.java +++ b/server/src/main/java/io/druid/curator/CuratorModule.java @@ -25,6 +25,7 @@ import com.google.inject.Provides; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import io.druid.guice.ConfigProvider; +import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -41,7 +42,9 @@ public class CuratorModule implements Module @Override public void configure(Binder binder) { - ConfigProvider.bind(binder, CuratorConfig.class); + JsonConfigProvider.bind( + binder, "druid.zk.service", + CuratorConfig.class); } @Provides @LazySingleton @@ -52,7 +55,7 @@ public class CuratorModule implements Module .connectString(config.getZkHosts()) .sessionTimeoutMs(config.getZkSessionTimeoutMs()) .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) - .compressionProvider(new PotentiallyGzippedCompressionProvider(config.enableCompression())) + .compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression())) .build(); lifecycle.addHandler( diff --git a/server/src/main/java/io/druid/guice/ServerModule.java b/server/src/main/java/io/druid/guice/ServerModule.java index d0ead89825d..3fbfe627edf 100644 --- a/server/src/main/java/io/druid/guice/ServerModule.java +++ b/server/src/main/java/io/druid/guice/ServerModule.java @@ -45,8 +45,7 @@ public class ServerModule implements DruidModule @Override public void configure(Binder binder) { - ConfigProvider.bind(binder, ZkPathsConfig.class); - + JsonConfigProvider.bind(binder, "druid.zk.paths", ZkPathsConfig.class); JsonConfigProvider.bind(binder, "druid", DruidNode.class, Self.class); } diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index b12aa3e5299..37df9000429 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -43,6 +43,7 @@ import io.druid.server.DruidNode; import io.druid.server.coordination.AbstractDataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; @@ -80,6 +81,9 @@ public class DruidClusterBridge private final BridgeZkCoordinator bridgeZkCoordinator; private final Announcer announcer; private final ServerInventoryView serverInventoryView; + private final ZkPathsConfig zkPathsConfig; + + private final DruidServerMetadata druidServerMetadata; private final Map segments = Maps.newHashMap(); private final Object lock = new Object(); @@ -91,6 +95,8 @@ public class DruidClusterBridge public DruidClusterBridge( ObjectMapper jsonMapper, DruidClusterBridgeConfig config, + ZkPathsConfig zkPathsConfig, + DruidServerMetadata druidServerMetadata, ScheduledExecutorFactory scheduledExecutorFactory, @Self DruidNode self, CuratorFramework curator, @@ -104,10 +110,12 @@ public class DruidClusterBridge this.jsonMapper = jsonMapper; this.config = config; this.bridgeZkCoordinator = bridgeZkCoordinator; + this.zkPathsConfig = zkPathsConfig; this.announcer = announcer; this.serverInventoryView = serverInventoryView; this.curator = curator; this.leaderLatch = leaderLatch; + this.druidServerMetadata = druidServerMetadata; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.self = self; @@ -212,7 +220,7 @@ public class DruidClusterBridge private LeaderLatch createNewLeaderLatch() { final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort() + curator, ZKPaths.makePath(zkPathsConfig.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort() ); newLeaderLatch.addListener( @@ -309,13 +317,13 @@ public class DruidClusterBridge self.getHostAndPort(), totalMaxSize, NODE_TYPE, - config.getTier(), - config.getPriority() + druidServerMetadata.getTier(), + druidServerMetadata.getPriority() ); try { - final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHostAndPort()); - log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHostAndPort(), totalMaxSize); + final String path = ZKPaths.makePath(zkPathsConfig.getAnnouncementsPath(), self.getHostAndPort()); + log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize); announcer.update(path, jsonMapper.writeValueAsBytes(me)); } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java index 5478a375577..4508f368a49 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java @@ -19,34 +19,50 @@ package io.druid.server.bridge; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.inject.Inject; import io.druid.client.DruidServer; -import io.druid.server.initialization.ZkPathsConfig; +import io.druid.client.DruidServerConfig; import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; /** */ -public abstract class DruidClusterBridgeConfig extends ZkPathsConfig +public abstract class DruidClusterBridgeConfig { - @Config("druid.server.tier") - @Default(DruidServer.DEFAULT_TIER) - public abstract String getTier(); + @JsonProperty + private Duration startDelay = new Duration("PT300s"); + @JsonProperty + private Duration period = new Duration("PT60s"); + @JsonProperty + private String brokerServiceName = "broker"; - @Config("druid.bridge.startDelay") - @Default("PT300s") - public abstract Duration getStartDelay(); - - @Config("druid.bridge.period") - @Default("PT60s") - public abstract Duration getPeriod(); - - @Config("druid.bridge.broker.serviceName") - public abstract String getBrokerServiceName(); - - @Config("druid.server.priority") - public int getPriority() + public Duration getStartDelay() { - return DruidServer.DEFAULT_PRIORITY; + return startDelay; + } + + public void setStartDelay(Duration startDelay) + { + this.startDelay = startDelay; + } + + public Duration getPeriod() + { + return period; + } + + public void setPeriod(Duration period) + { + this.period = period; + } + + public String getBrokerServiceName() + { + return brokerServiceName; + } + + public void setBrokerServiceName(String brokerServiceName) + { + this.brokerServiceName = brokerServiceName; } } diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index 8494b45e257..7fa09926938 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -19,85 +19,105 @@ package io.druid.server.initialization; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.curator.utils.ZKPaths; -import org.skife.config.Config; -public abstract class ZkPathsConfig +public class ZkPathsConfig { - @Config("druid.zk.paths.base") - public String getZkBasePath() + @JsonProperty + private + String base = "druid"; + @JsonProperty + private + String propertiesPath; + @JsonProperty + private + String announcementsPath; + @JsonProperty + private + String servedSegmentsPath; + @JsonProperty + private + String liveSegmentsPath; + @JsonProperty + private + String coordinatorPath; + @JsonProperty + private + String loadQueuePath; + @JsonProperty + private + String connectorPath; + + public String getBase() { - return "druid"; + return base; } - @Config("druid.zk.paths.propertiesPath") public String getPropertiesPath() { - return defaultPath("properties"); + return (null == propertiesPath) ? defaultPath("properties") : propertiesPath; } - @Config("druid.zk.paths.announcementsPath") public String getAnnouncementsPath() { - return defaultPath("announcements"); + return (null == announcementsPath) ? defaultPath("announcements") : announcementsPath; } - @Config("druid.zk.paths.servedSegmentsPath") public String getServedSegmentsPath() { - return defaultPath("servedSegments"); + return (null == servedSegmentsPath) ? defaultPath("servedSegments") : servedSegmentsPath; } - @Config("druid.zk.paths.liveSegmentsPath") public String getLiveSegmentsPath() { - return defaultPath("segments"); + return (null == liveSegmentsPath) ? defaultPath("segments") : liveSegmentsPath; } - @Config("druid.zk.paths.loadQueuePath") - public String getLoadQueuePath() - { - return defaultPath("loadQueue"); - } - - @Config("druid.zk.paths.coordinatorPath") public String getCoordinatorPath() { - return defaultPath("coordinator"); + return (null == coordinatorPath) ? defaultPath("coordinator") : coordinatorPath; + } + + public String getLoadQueuePath() + { + return (null == loadQueuePath) ? defaultPath("loadQueue") : loadQueuePath; } - @Config("druid.zk.paths.connectorPath") public String getConnectorPath() { - return defaultPath("connector"); + return (null == connectorPath) ? defaultPath("connector") : connectorPath; } - @Config("druid.zk.paths.indexer.announcementsPath") - public String getIndexerAnnouncementPath() + protected String defaultPath(final String subPath) { - return defaultPath("indexer/announcements"); + return ZKPaths.makePath(getBase(), subPath); } - @Config("druid.zk.paths.indexer.tasksPath") - public String getIndexerTaskPath() - { - return defaultPath("indexer/tasks"); - } - - @Config("druid.zk.paths.indexer.statusPath") - public String getIndexerStatusPath() - { - return defaultPath("indexer/status"); - } - - @Config("druid.zk.paths.indexer.leaderLatchPath") - public String getIndexerLeaderLatchPath() - { - return defaultPath("indexer/leaderLatchPath"); - } - - private String defaultPath(final String subPath) - { - return ZKPaths.makePath(getZkBasePath(), subPath); + @Override + public boolean equals(Object other){ + if(null == other){ + return false; + } + if(this == other){ + return true; + } + if(!(other instanceof ZkPathsConfig)){ + return false; + } + ZkPathsConfig otherConfig = (ZkPathsConfig) other; + if( + this.getBase().equals(otherConfig.getBase()) && + this.getAnnouncementsPath().equals(otherConfig.getAnnouncementsPath()) && + this.getConnectorPath().equals(otherConfig.getConnectorPath()) && + this.getLiveSegmentsPath().equals(otherConfig.getLiveSegmentsPath()) && + this.getCoordinatorPath().equals(otherConfig.getCoordinatorPath()) && + this.getLoadQueuePath().equals(otherConfig.getLoadQueuePath()) && + this.getPropertiesPath().equals(otherConfig.getPropertiesPath()) && + this.getServedSegmentsPath().equals(otherConfig.getServedSegmentsPath()) + ){ + return true; + } + return false; } } diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index c2ed92d170c..a635e55d688 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -120,7 +120,7 @@ public class BatchServerInventoryViewTest new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return testBasePath; } @@ -139,7 +139,7 @@ public class BatchServerInventoryViewTest new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return testBasePath; } @@ -155,7 +155,7 @@ public class BatchServerInventoryViewTest new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return testBasePath; } diff --git a/server/src/test/java/io/druid/curator/CuratorConfigTest.java b/server/src/test/java/io/druid/curator/CuratorConfigTest.java new file mode 100644 index 00000000000..846d9f37155 --- /dev/null +++ b/server/src/test/java/io/druid/curator/CuratorConfigTest.java @@ -0,0 +1,39 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.curator; + +import io.druid.guice.JsonConfigTesterBase; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; + +public class CuratorConfigTest extends JsonConfigTesterBase +{ + @Test + public void testHostName() throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + propertyValues.put(getPropertyKey("host"),"fooHost"); + testProperties.putAll(propertyValues); + configProvider.inject(testProperties, configurator); + CuratorConfig config = configProvider.get().get(); + Assert.assertEquals("fooHost", config.getZkHosts()); + } +} diff --git a/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java b/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java new file mode 100644 index 00000000000..02779522798 --- /dev/null +++ b/server/src/test/java/io/druid/guice/JsonConfigTesterBase.java @@ -0,0 +1,155 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.guice; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import io.druid.initialization.Initialization; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * + */ +public abstract class JsonConfigTesterBase +{ + + protected static final String configPrefix = "druid.test.prefix"; + protected Injector injector; + protected final Class clazz = (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0]; + + protected Map propertyValues = new HashMap<>(); + protected int assertions = 0; + protected Properties testProperties = new Properties(); + + protected static String getPropertyKey(String fieldName){ + return String.format( + "%s.%s", + configPrefix, fieldName + ); + } + protected static String getPropertyKey(Field field) + { + JsonProperty jsonProperty = field.getAnnotation(JsonProperty.class); + if (null != jsonProperty) { + return getPropertyKey( + (jsonProperty.value() == null || jsonProperty.value().isEmpty()) + ? field.getName() + : jsonProperty.value() + ); + } + return null; + } + + private final Module simpleJsonConfigModule = new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + JsonConfigProvider.bind(binder, configPrefix, clazz); + } + }; + + + protected final void validateEntries(T config) + throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + for (Field field : clazz.getDeclaredFields()) { + final String propertyKey = getPropertyKey(field); + if (null != propertyKey) { + field.setAccessible(true); + String getter = String.format( + "get%s%s", + field.getName().substring(0, 1).toUpperCase(), + field.getName().substring(1) + ); + Method method = clazz.getDeclaredMethod(getter); + final String value; + if (null != method) { + value = method.invoke(config).toString(); + } else { + value = field.get(config).toString(); + } + + Assert.assertEquals(propertyValues.get(propertyKey), value); + ++assertions; + } + } + } + + protected JsonConfigurator configurator; + protected JsonConfigProvider configProvider; + + @Before + public void setup() throws IllegalAccessException + { + assertions = 0; + T fakeValues = EasyMock.createNiceMock(clazz); + propertyValues.clear(); + testProperties.clear(); + for (Field field : clazz.getDeclaredFields()) { + final String propertyKey = getPropertyKey(field); + if (null != propertyKey) { + field.setAccessible(true); + if (field.getType().isAssignableFrom(String.class)) { + propertyValues.put(propertyKey, UUID.randomUUID().toString()); + } else { + propertyValues.put(propertyKey, field.get(fakeValues).toString()); + } + } + } + testProperties.putAll(System.getProperties()); + testProperties.putAll(propertyValues); + injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of(simpleJsonConfigModule) + ); + configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + configProvider = JsonConfigProvider.of(configPrefix, clazz); + } + + @Test + public final void simpleInjectionTest() + throws IllegalAccessException, NoSuchMethodException, InvocationTargetException + { + configProvider.inject(testProperties, configurator); + validateEntries(configProvider.get().get()); + Assert.assertEquals(propertyValues.size(), assertions); + } + + +} diff --git a/server/src/test/java/io/druid/initialization/ZkPathsConfigTest.java b/server/src/test/java/io/druid/initialization/ZkPathsConfigTest.java new file mode 100644 index 00000000000..ee795c605fa --- /dev/null +++ b/server/src/test/java/io/druid/initialization/ZkPathsConfigTest.java @@ -0,0 +1,75 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.initialization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Key; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.JsonConfigTesterBase; +import io.druid.guice.JsonConfigurator; +import io.druid.guice.annotations.Json; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.UUID; + +/** + * + */ +public class ZkPathsConfigTest extends JsonConfigTesterBase +{ + @Test + public void testOverrideBaseOnlyConfig() + throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, IOException + { + JsonConfigurator configurator = injector.getBinding(JsonConfigurator.class).getProvider().get(); + + JsonConfigProvider zkPathsConfig = JsonConfigProvider.of(configPrefix, ZkPathsConfig.class); + testProperties.clear(); + String base = UUID.randomUUID().toString(); + testProperties.put(String.format("%s.base", configPrefix), base); + zkPathsConfig.inject(testProperties, configurator); + + propertyValues.clear(); + propertyValues.put(String.format("%s.base", configPrefix), base); + propertyValues.put(String.format("%s.propertiesPath", configPrefix), ZKPaths.makePath(base, "properties")); + propertyValues.put(String.format("%s.announcementsPath", configPrefix), ZKPaths.makePath(base, "announcements")); + propertyValues.put(String.format("%s.servedSegmentsPath", configPrefix), ZKPaths.makePath(base, "servedSegments")); + propertyValues.put(String.format("%s.liveSegmentsPath", configPrefix), ZKPaths.makePath(base, "segments")); + propertyValues.put(String.format("%s.coordinatorPath", configPrefix), ZKPaths.makePath(base, "coordinator")); + propertyValues.put(String.format("%s.loadQueuePath", configPrefix), ZKPaths.makePath(base, "loadQueue")); + propertyValues.put(String.format("%s.connectorPath", configPrefix), ZKPaths.makePath(base, "connector")); + + ZkPathsConfig zkPathsConfigObj = zkPathsConfig.get().get(); + validateEntries(zkPathsConfigObj); + Assert.assertEquals(propertyValues.size(), assertions); + + ObjectMapper jsonMapper = injector.getProvider(Key.get(ObjectMapper.class, Json.class)).get(); + String jsonVersion = jsonMapper.writeValueAsString(zkPathsConfigObj); + + ZkPathsConfig zkPathsConfigObjDeSer = jsonMapper.readValue(jsonVersion, ZkPathsConfig.class); + + Assert.assertEquals(zkPathsConfigObj, zkPathsConfigObjDeSer); + } +} diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index 7440367a562..a5306307629 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -90,12 +90,6 @@ public class DruidClusterBridgeTest ObjectMapper jsonMapper = new DefaultObjectMapper(); DruidClusterBridgeConfig config = new DruidClusterBridgeConfig() { - @Override - public String getTier() - { - return DruidServer.DEFAULT_TIER; - } - @Override public Duration getStartDelay() { @@ -107,18 +101,6 @@ public class DruidClusterBridgeTest { return new Duration(Long.MAX_VALUE); } - - @Override - public String getBrokerServiceName() - { - return "testz0rz"; - } - - @Override - public int getPriority() - { - return 0; - } }; ScheduledExecutorFactory factory = ScheduledExecutors.createFactory(new Lifecycle()); @@ -134,7 +116,7 @@ public class DruidClusterBridgeTest ZkPathsConfig zkPathsConfig = new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return "/druid"; } @@ -195,6 +177,8 @@ public class DruidClusterBridgeTest DruidClusterBridge bridge = new DruidClusterBridge( jsonMapper, config, + zkPathsConfig, + metadata, factory, me, localCf, diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index e6216082e50..cb49380d8b5 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -101,7 +101,7 @@ public class ZkCoordinatorTest extends CuratorTestBase final ZkPathsConfig zkPaths = new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return "/druid"; } diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index d4d2f8d7aaa..2ba2b0ba135 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -107,7 +107,7 @@ public class BatchDataSegmentAnnouncerTest new ZkPathsConfig() { @Override - public String getZkBasePath() + public String getBase() { return testBasePath; } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 42eb36be50b..bcc418e590e 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -99,7 +99,7 @@ public class DruidCoordinatorTest { @Override - public String getZkBasePath() + public String getBase() { return ""; } diff --git a/services/src/main/java/io/druid/cli/CliBridge.java b/services/src/main/java/io/druid/cli/CliBridge.java index 808b79bbb48..861150ba4a1 100644 --- a/services/src/main/java/io/druid/cli/CliBridge.java +++ b/services/src/main/java/io/druid/cli/CliBridge.java @@ -109,7 +109,7 @@ public class CliBridge extends ServerRunnable .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) .compressionProvider( new PotentiallyGzippedCompressionProvider( - bridgeCuratorConfig.enableCompression() + bridgeCuratorConfig.getEnableCompression() ) ) .build();