mirror of https://github.com/apache/druid.git
1) Adjust all of the uses of zookeeper paths to be based on ZkPathsConfig
This commit is contained in:
parent
b6d72c0da4
commit
67ce1b6f26
|
@ -321,6 +321,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
|
|||
setServerInventoryThingie(
|
||||
new ServerInventoryThingie(
|
||||
getConfigFactory().build(ServerInventoryThingieConfig.class),
|
||||
getZkPaths(),
|
||||
getCuratorFramework(),
|
||||
exec,
|
||||
getJsonMapper()
|
||||
|
|
|
@ -28,6 +28,8 @@ import com.metamx.common.lifecycle.LifecycleStart;
|
|||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
|
||||
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
|
||||
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
|
@ -55,6 +57,7 @@ public class ServerInventoryThingie implements ServerView
|
|||
|
||||
public ServerInventoryThingie(
|
||||
final ServerInventoryThingieConfig config,
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ExecutorService exec,
|
||||
final ObjectMapper jsonMapper
|
||||
|
@ -62,7 +65,20 @@ public class ServerInventoryThingie implements ServerView
|
|||
{
|
||||
inventoryManager = new CuratorInventoryManager<DruidServer, DataSegment>(
|
||||
curator,
|
||||
config,
|
||||
new InventoryManagerConfig()
|
||||
{
|
||||
@Override
|
||||
public String getContainerPath()
|
||||
{
|
||||
return zkPaths.getAnnouncementsPath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInventoryPath()
|
||||
{
|
||||
return zkPaths.getServedSegmentsPath();
|
||||
}
|
||||
},
|
||||
exec,
|
||||
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>()
|
||||
{
|
||||
|
|
|
@ -19,22 +19,13 @@
|
|||
|
||||
package com.metamx.druid.client;
|
||||
|
||||
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ServerInventoryThingieConfig implements InventoryManagerConfig
|
||||
public abstract class ServerInventoryThingieConfig
|
||||
{
|
||||
@Config("druid.zk.paths.announcementsPath")
|
||||
@Override
|
||||
public abstract String getContainerPath();
|
||||
|
||||
@Config("druid.zk.paths.servedSegmentsPath")
|
||||
@Override
|
||||
public abstract String getInventoryPath();
|
||||
|
||||
@Config("druid.master.removedSegmentLifetime")
|
||||
@Default("1")
|
||||
public abstract int getRemovedSegmentLifetime();
|
||||
|
|
|
@ -68,9 +68,10 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
|
|||
return;
|
||||
}
|
||||
|
||||
log.info("Starting CuratorDataSegmentAnnouncer for server[%s] with config[%s]", server.getName(), config);
|
||||
try {
|
||||
announcer.announce(makeAnnouncementPath(), jsonMapper.writeValueAsBytes(server));
|
||||
final String path = makeAnnouncementPath();
|
||||
log.info("Announcing self[%s] at [%s]", server, path);
|
||||
announcer.announce(path, jsonMapper.writeValueAsBytes(server));
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package com.metamx.druid.curator.inventory;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -31,12 +30,10 @@ import com.metamx.druid.curator.ShutdownNowIgnoringExecutorService;
|
|||
import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
|
||||
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.framework.recipes.cache.ChildData;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||
import org.apache.curator.retry.RetryOneTime;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -217,6 +214,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
|
|||
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
|
||||
|
||||
inventoryCache.start();
|
||||
strategy.newContainer(container);
|
||||
|
||||
break;
|
||||
case CHILD_REMOVED:
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.skife.config.Config;
|
|||
public abstract class ZkPathsConfig
|
||||
{
|
||||
@Config("druid.zk.paths.base")
|
||||
protected String getZkBasePath()
|
||||
public String getZkBasePath()
|
||||
{
|
||||
return "/druid";
|
||||
}
|
||||
|
|
|
@ -35,9 +35,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
|
|||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.BaseServerNode;
|
||||
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
|
||||
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncerConfig;
|
||||
import com.metamx.druid.coordination.DataSegmentAnnouncer;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.http.QueryServlet;
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
package com.metamx.druid.coordination;
|
||||
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
public abstract class CuratorDataSegmentAnnouncerConfig
|
||||
{
|
||||
@Config("druid.zk.paths.announcementsPath")
|
||||
public abstract String getAnnouncementsPath();
|
||||
|
||||
@Config("druid.zk.paths.servedSegmentsPath")
|
||||
public abstract String getServedSegmentsPath();
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.coordination;
|
||||
|
||||
import org.skife.config.Config;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class DruidClusterInfoConfig
|
||||
{
|
||||
@Config("druid.zk.paths.masterPath")
|
||||
public abstract String getMasterPath();
|
||||
}
|
|
@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.LifecycleStop;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.curator.announcement.Announcer;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.loading.SegmentLoadingException;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.AlertEvent;
|
||||
|
@ -68,6 +69,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
public ZkCoordinator(
|
||||
ObjectMapper jsonMapper,
|
||||
ZkCoordinatorConfig config,
|
||||
ZkPathsConfig zkPaths,
|
||||
DruidServerMetadata me,
|
||||
DataSegmentAnnouncer announcer,
|
||||
CuratorFramework curator,
|
||||
|
@ -83,14 +85,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
this.serverManager = serverManager;
|
||||
this.emitter = emitter;
|
||||
|
||||
this.loadQueueLocation = ZKPaths.makePath(config.getLoadQueueLocation(), me.getName());
|
||||
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsLocation(), me.getName());
|
||||
this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
|
||||
this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start() throws IOException
|
||||
{
|
||||
log.info("Starting zkCoordinator for server[%s] with config[%s]", me, config);
|
||||
log.info("Starting zkCoordinator for server[%s]", me);
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
|
@ -105,7 +107,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
);
|
||||
|
||||
try {
|
||||
this.config.getSegmentInfoCacheDirectory().mkdirs();
|
||||
config.getSegmentInfoCacheDirectory().mkdirs();
|
||||
|
||||
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
|
||||
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
|
||||
|
@ -158,6 +160,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
}
|
||||
}
|
||||
);
|
||||
loadQueueCache.start();
|
||||
}
|
||||
catch (Exception e) {
|
||||
Throwables.propagateIfPossible(e, IOException.class);
|
||||
|
@ -214,16 +217,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception occurred reading file [%s]", file);
|
||||
emitter.emit(
|
||||
new AlertEvent.Builder().build(
|
||||
"Failed to read segment info file",
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("file", file)
|
||||
.put("exception", e.toString())
|
||||
.build()
|
||||
)
|
||||
);
|
||||
log.makeAlert(e, "Failed to load segment from segmentInfo file")
|
||||
.addData("file", file)
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,15 +27,6 @@ import java.io.File;
|
|||
*/
|
||||
public abstract class ZkCoordinatorConfig
|
||||
{
|
||||
@Config("druid.zk.paths.announcementsPath")
|
||||
public abstract String getAnnounceLocation();
|
||||
|
||||
@Config("druid.zk.paths.servedSegmentsPath")
|
||||
public abstract String getServedSegmentsLocation();
|
||||
|
||||
@Config("druid.zk.paths.loadQueuePath")
|
||||
public abstract String getLoadQueueLocation();
|
||||
|
||||
@Config("druid.paths.segmentInfoCache")
|
||||
public abstract File getSegmentInfoCacheDirectory();
|
||||
}
|
||||
|
|
|
@ -117,6 +117,7 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
|
|||
final ZkCoordinator coordinator = new ZkCoordinator(
|
||||
getJsonMapper(),
|
||||
getConfigFactory().build(ZkCoordinatorConfig.class),
|
||||
getZkPaths(),
|
||||
getDruidServerMetadata(),
|
||||
getAnnouncer(),
|
||||
getCuratorFramework(),
|
||||
|
|
|
@ -48,6 +48,7 @@ import com.metamx.druid.db.DbConnectorConfig;
|
|||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.initialization.ServerConfig;
|
||||
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
|
@ -122,11 +123,13 @@ public class MasterMain
|
|||
lifecycle
|
||||
);
|
||||
|
||||
final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class);
|
||||
|
||||
final ExecutorService exec = Executors.newFixedThreadPool(
|
||||
1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryThingie-%s").build()
|
||||
);
|
||||
ServerInventoryThingie serverInventoryThingie = new ServerInventoryThingie(
|
||||
configFactory.build(ServerInventoryThingieConfig.class), curatorFramework, exec, jsonMapper
|
||||
configFactory.build(ServerInventoryThingieConfig.class), zkPaths, curatorFramework, exec, jsonMapper
|
||||
);
|
||||
lifecycle.addManagedInstance(serverInventoryThingie);
|
||||
|
||||
|
@ -194,6 +197,7 @@ public class MasterMain
|
|||
|
||||
final DruidMaster master = new DruidMaster(
|
||||
druidMasterConfig,
|
||||
zkPaths,
|
||||
configManager,
|
||||
databaseSegmentManager,
|
||||
serverInventoryThingie,
|
||||
|
|
|
@ -45,6 +45,7 @@ import com.metamx.druid.config.JacksonConfigManager;
|
|||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
@ -78,6 +79,7 @@ public class DruidMaster
|
|||
private volatile boolean master = false;
|
||||
|
||||
private final DruidMasterConfig config;
|
||||
private final ZkPathsConfig zkPaths;
|
||||
private final JacksonConfigManager configManager;
|
||||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final ServerInventoryThingie serverInventoryThingie;
|
||||
|
@ -93,6 +95,7 @@ public class DruidMaster
|
|||
|
||||
public DruidMaster(
|
||||
DruidMasterConfig config,
|
||||
ZkPathsConfig zkPaths,
|
||||
JacksonConfigManager configManager,
|
||||
DatabaseSegmentManager databaseSegmentManager,
|
||||
ServerInventoryThingie serverInventoryThingie,
|
||||
|
@ -105,6 +108,7 @@ public class DruidMaster
|
|||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.zkPaths = zkPaths;
|
||||
this.configManager = configManager;
|
||||
|
||||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
|
@ -256,9 +260,9 @@ public class DruidMaster
|
|||
);
|
||||
}
|
||||
|
||||
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(config.getLoadQueuePath(), to), segmentName);
|
||||
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
|
||||
final String toServedSegPath = ZKPaths.makePath(
|
||||
ZKPaths.makePath(config.getServedSegmentsLocation(), to), segmentName
|
||||
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
|
||||
);
|
||||
|
||||
loadPeon.loadSegment(
|
||||
|
@ -403,7 +407,7 @@ public class DruidMaster
|
|||
private LeaderLatch createNewLeaderLatch()
|
||||
{
|
||||
final LeaderLatch newLeaderLatch = new LeaderLatch(
|
||||
curator, ZKPaths.makePath(config.getBasePath(), MASTER_OWNER_NODE), config.getHost()
|
||||
curator, ZKPaths.makePath(zkPaths.getMasterPath(), MASTER_OWNER_NODE), config.getHost()
|
||||
);
|
||||
|
||||
newLeaderLatch.attachListener(
|
||||
|
@ -711,7 +715,7 @@ public class DruidMaster
|
|||
final DruidCluster cluster = new DruidCluster();
|
||||
for (DruidServer server : servers) {
|
||||
if (!loadManagementPeons.containsKey(server.getName())) {
|
||||
String basePath = ZKPaths.makePath(config.getLoadQueuePath(), server.getName());
|
||||
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
|
||||
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
|
||||
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);
|
||||
|
||||
|
|
|
@ -30,15 +30,6 @@ public abstract class DruidMasterConfig
|
|||
@Config("druid.host")
|
||||
public abstract String getHost();
|
||||
|
||||
@Config("druid.zk.paths.masterPath")
|
||||
public abstract String getBasePath();
|
||||
|
||||
@Config("druid.zk.paths.loadQueuePath")
|
||||
public abstract String getLoadQueuePath();
|
||||
|
||||
@Config("druid.zk.paths.servedSegmentsPath")
|
||||
public abstract String getServedSegmentsLocation();
|
||||
|
||||
@Config("druid.master.startDelay")
|
||||
@Default("PT600s")
|
||||
public abstract Duration getMasterStartDelay();
|
||||
|
|
|
@ -163,7 +163,7 @@ public class DruidSetup
|
|||
ZkPathsConfig config = new ZkPathsConfig()
|
||||
{
|
||||
@Override
|
||||
protected String getZkBasePath()
|
||||
public String getZkBasePath()
|
||||
{
|
||||
return zPathBase;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.CacheTestSegmentLoader;
|
||||
import com.metamx.druid.metrics.NoopServiceEmitter;
|
||||
|
@ -81,30 +82,13 @@ public class ZkCoordinatorTest
|
|||
jsonMapper,
|
||||
new ZkCoordinatorConfig()
|
||||
{
|
||||
@Override
|
||||
public String getAnnounceLocation()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServedSegmentsLocation()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLoadQueueLocation()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getSegmentInfoCacheDirectory()
|
||||
{
|
||||
return cacheDir;
|
||||
}
|
||||
},
|
||||
new ZkPathsConfig(){},
|
||||
new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal"),
|
||||
announcer,
|
||||
curator,
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerInventoryThingie;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.initialization.ZkPathsConfig;
|
||||
import com.metamx.druid.metrics.NoopServiceEmitter;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -73,24 +74,6 @@ public class DruidMasterTest
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBasePath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLoadQueuePath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServedSegmentsLocation()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getMasterStartDelay()
|
||||
{
|
||||
|
@ -139,6 +122,7 @@ public class DruidMasterTest
|
|||
return 0;
|
||||
}
|
||||
},
|
||||
new ZkPathsConfig(){},
|
||||
null,
|
||||
databaseSegmentManager,
|
||||
serverInventoryThingie,
|
||||
|
|
Loading…
Reference in New Issue