diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 699662b2916..375b032f6bf 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -321,6 +321,7 @@ public abstract class QueryableNode extends Registering setServerInventoryThingie( new ServerInventoryThingie( getConfigFactory().build(ServerInventoryThingieConfig.class), + getZkPaths(), getCuratorFramework(), exec, getJsonMapper() diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryThingie.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryThingie.java index 183a8d5a6b5..0f5b52ffc43 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryThingie.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryThingie.java @@ -28,6 +28,8 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.curator.inventory.CuratorInventoryManager; import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy; +import com.metamx.druid.curator.inventory.InventoryManagerConfig; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; @@ -55,6 +57,7 @@ public class ServerInventoryThingie implements ServerView public ServerInventoryThingie( final ServerInventoryThingieConfig config, + final ZkPathsConfig zkPaths, final CuratorFramework curator, final ExecutorService exec, final ObjectMapper jsonMapper @@ -62,7 +65,20 @@ public class ServerInventoryThingie implements ServerView { inventoryManager = new CuratorInventoryManager( curator, - config, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return zkPaths.getAnnouncementsPath(); + } + + @Override + public String getInventoryPath() + { + return zkPaths.getServedSegmentsPath(); + } + }, exec, new CuratorInventoryManagerStrategy() { diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryThingieConfig.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryThingieConfig.java index 4c77fc40064..560f6e46d62 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryThingieConfig.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryThingieConfig.java @@ -19,22 +19,13 @@ package com.metamx.druid.client; -import com.metamx.druid.curator.inventory.InventoryManagerConfig; import org.skife.config.Config; import org.skife.config.Default; /** */ -public abstract class ServerInventoryThingieConfig implements InventoryManagerConfig +public abstract class ServerInventoryThingieConfig { - @Config("druid.zk.paths.announcementsPath") - @Override - public abstract String getContainerPath(); - - @Config("druid.zk.paths.servedSegmentsPath") - @Override - public abstract String getInventoryPath(); - @Config("druid.master.removedSegmentLifetime") @Default("1") public abstract int getRemovedSegmentLifetime(); diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java index 7c23cee2b65..a7efc6cb9e8 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java @@ -68,9 +68,10 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer return; } - log.info("Starting CuratorDataSegmentAnnouncer for server[%s] with config[%s]", server.getName(), config); try { - announcer.announce(makeAnnouncementPath(), jsonMapper.writeValueAsBytes(server)); + final String path = makeAnnouncementPath(); + log.info("Announcing self[%s] at [%s]", server, path); + announcer.announce(path, jsonMapper.writeValueAsBytes(server)); } catch (JsonProcessingException e) { throw Throwables.propagate(e); diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java index 5bb139b8358..2c2c128dfec 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java @@ -19,7 +19,6 @@ package com.metamx.druid.curator.inventory; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -31,12 +30,10 @@ import com.metamx.druid.curator.ShutdownNowIgnoringExecutorService; import com.metamx.druid.curator.cache.PathChildrenCacheFactory; import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.ZKPaths; import java.io.IOException; @@ -217,6 +214,7 @@ public class CuratorInventoryManager containers.put(containerKey, new ContainerHolder(container, inventoryCache)); inventoryCache.start(); + strategy.newContainer(container); break; case CHILD_REMOVED: diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java index 34afb45e4b3..40d24522951 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java @@ -24,7 +24,7 @@ import org.skife.config.Config; public abstract class ZkPathsConfig { @Config("druid.zk.paths.base") - protected String getZkBasePath() + public String getZkBasePath() { return "/druid"; } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java index 14a04a9d50a..faefc8de747 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java @@ -35,9 +35,6 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.BaseServerNode; -import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; -import com.metamx.druid.coordination.CuratorDataSegmentAnnouncerConfig; -import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.QueryServlet; diff --git a/server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java b/server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java deleted file mode 100644 index 049b7515fa3..00000000000 --- a/server/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncerConfig.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.metamx.druid.coordination; - -import org.skife.config.Config; -import org.skife.config.Default; - -public abstract class CuratorDataSegmentAnnouncerConfig -{ - @Config("druid.zk.paths.announcementsPath") - public abstract String getAnnouncementsPath(); - - @Config("druid.zk.paths.servedSegmentsPath") - public abstract String getServedSegmentsPath(); - -} diff --git a/server/src/main/java/com/metamx/druid/coordination/DruidClusterInfoConfig.java b/server/src/main/java/com/metamx/druid/coordination/DruidClusterInfoConfig.java deleted file mode 100644 index 1a6a539f1ca..00000000000 --- a/server/src/main/java/com/metamx/druid/coordination/DruidClusterInfoConfig.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.coordination; - -import org.skife.config.Config; - -/** - */ -public abstract class DruidClusterInfoConfig -{ - @Config("druid.zk.paths.masterPath") - public abstract String getMasterPath(); -} diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 9be872eeb38..426a0b00fcf 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.AlertEvent; @@ -68,6 +69,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler public ZkCoordinator( ObjectMapper jsonMapper, ZkCoordinatorConfig config, + ZkPathsConfig zkPaths, DruidServerMetadata me, DataSegmentAnnouncer announcer, CuratorFramework curator, @@ -83,14 +85,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler this.serverManager = serverManager; this.emitter = emitter; - this.loadQueueLocation = ZKPaths.makePath(config.getLoadQueueLocation(), me.getName()); - this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsLocation(), me.getName()); + this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); + this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); } @LifecycleStart public void start() throws IOException { - log.info("Starting zkCoordinator for server[%s] with config[%s]", me, config); + log.info("Starting zkCoordinator for server[%s]", me); synchronized (lock) { if (started) { return; @@ -105,7 +107,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler ); try { - this.config.getSegmentInfoCacheDirectory().mkdirs(); + config.getSegmentInfoCacheDirectory().mkdirs(); curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); @@ -158,6 +160,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler } } ); + loadQueueCache.start(); } catch (Exception e) { Throwables.propagateIfPossible(e, IOException.class); @@ -214,16 +217,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler } } catch (Exception e) { - log.error(e, "Exception occurred reading file [%s]", file); - emitter.emit( - new AlertEvent.Builder().build( - "Failed to read segment info file", - ImmutableMap.builder() - .put("file", file) - .put("exception", e.toString()) - .build() - ) - ); + log.makeAlert(e, "Failed to load segment from segmentInfo file") + .addData("file", file) + .emit(); } } } diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java index 91afbcbbc5e..262f92e0c87 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinatorConfig.java @@ -27,15 +27,6 @@ import java.io.File; */ public abstract class ZkCoordinatorConfig { - @Config("druid.zk.paths.announcementsPath") - public abstract String getAnnounceLocation(); - - @Config("druid.zk.paths.servedSegmentsPath") - public abstract String getServedSegmentsLocation(); - - @Config("druid.zk.paths.loadQueuePath") - public abstract String getLoadQueueLocation(); - @Config("druid.paths.segmentInfoCache") public abstract File getSegmentInfoCacheDirectory(); } diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 57f2ef1db38..cc00669638a 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -117,6 +117,7 @@ public class ComputeNode extends BaseServerNode final ZkCoordinator coordinator = new ZkCoordinator( getJsonMapper(), getConfigFactory().build(ZkCoordinatorConfig.class), + getZkPaths(), getDruidServerMetadata(), getAnnouncer(), getCuratorFramework(), diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 43a117e800c..bf973c06171 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -48,6 +48,7 @@ import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.master.DruidMaster; @@ -122,11 +123,13 @@ public class MasterMain lifecycle ); + final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); + final ExecutorService exec = Executors.newFixedThreadPool( 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryThingie-%s").build() ); ServerInventoryThingie serverInventoryThingie = new ServerInventoryThingie( - configFactory.build(ServerInventoryThingieConfig.class), curatorFramework, exec, jsonMapper + configFactory.build(ServerInventoryThingieConfig.class), zkPaths, curatorFramework, exec, jsonMapper ); lifecycle.addManagedInstance(serverInventoryThingie); @@ -194,6 +197,7 @@ public class MasterMain final DruidMaster master = new DruidMaster( druidMasterConfig, + zkPaths, configManager, databaseSegmentManager, serverInventoryThingie, diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 950cc609acc..13fdf7b1642 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -45,6 +45,7 @@ import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -78,6 +79,7 @@ public class DruidMaster private volatile boolean master = false; private final DruidMasterConfig config; + private final ZkPathsConfig zkPaths; private final JacksonConfigManager configManager; private final DatabaseSegmentManager databaseSegmentManager; private final ServerInventoryThingie serverInventoryThingie; @@ -93,6 +95,7 @@ public class DruidMaster public DruidMaster( DruidMasterConfig config, + ZkPathsConfig zkPaths, JacksonConfigManager configManager, DatabaseSegmentManager databaseSegmentManager, ServerInventoryThingie serverInventoryThingie, @@ -105,6 +108,7 @@ public class DruidMaster ) { this.config = config; + this.zkPaths = zkPaths; this.configManager = configManager; this.databaseSegmentManager = databaseSegmentManager; @@ -256,9 +260,9 @@ public class DruidMaster ); } - final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(config.getLoadQueuePath(), to), segmentName); + final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); final String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(config.getServedSegmentsLocation(), to), segmentName + ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName ); loadPeon.loadSegment( @@ -403,7 +407,7 @@ public class DruidMaster private LeaderLatch createNewLeaderLatch() { final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(config.getBasePath(), MASTER_OWNER_NODE), config.getHost() + curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost() ); newLeaderLatch.attachListener( @@ -711,7 +715,7 @@ public class DruidMaster final DruidCluster cluster = new DruidCluster(); for (DruidServer server : servers) { if (!loadManagementPeons.containsKey(server.getName())) { - String basePath = ZKPaths.makePath(config.getLoadQueuePath(), server.getName()); + String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()); LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath); log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index 88f2a348ed0..a464152ee90 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -30,15 +30,6 @@ public abstract class DruidMasterConfig @Config("druid.host") public abstract String getHost(); - @Config("druid.zk.paths.masterPath") - public abstract String getBasePath(); - - @Config("druid.zk.paths.loadQueuePath") - public abstract String getLoadQueuePath(); - - @Config("druid.zk.paths.servedSegmentsPath") - public abstract String getServedSegmentsLocation(); - @Config("druid.master.startDelay") @Default("PT600s") public abstract Duration getMasterStartDelay(); diff --git a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java index 88c2c669ec1..5980f1e28b1 100644 --- a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java +++ b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java @@ -163,7 +163,7 @@ public class DruidSetup ZkPathsConfig config = new ZkPathsConfig() { @Override - protected String getZkBasePath() + public String getZkBasePath() { return zPathBase; } diff --git a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java index ecf92606287..ad28722c6dd 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.CacheTestSegmentLoader; import com.metamx.druid.metrics.NoopServiceEmitter; @@ -81,30 +82,13 @@ public class ZkCoordinatorTest jsonMapper, new ZkCoordinatorConfig() { - @Override - public String getAnnounceLocation() - { - return null; - } - - @Override - public String getServedSegmentsLocation() - { - return null; - } - - @Override - public String getLoadQueueLocation() - { - return null; - } - @Override public File getSegmentInfoCacheDirectory() { return cacheDir; } }, + new ZkPathsConfig(){}, new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal"), announcer, curator, diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index d730b2ded20..fa2f12fc943 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -24,6 +24,7 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.ServerInventoryThingie; import com.metamx.druid.db.DatabaseSegmentManager; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.metrics.NoopServiceEmitter; import org.apache.curator.framework.CuratorFramework; import org.easymock.EasyMock; @@ -73,24 +74,6 @@ public class DruidMasterTest return null; } - @Override - public String getBasePath() - { - return null; - } - - @Override - public String getLoadQueuePath() - { - return null; - } - - @Override - public String getServedSegmentsLocation() - { - return null; - } - @Override public Duration getMasterStartDelay() { @@ -139,6 +122,7 @@ public class DruidMasterTest return 0; } }, + new ZkPathsConfig(){}, null, databaseSegmentManager, serverInventoryThingie,