1) Remove references to zkclient

2) Eradicate zkclient from the poms!
This commit is contained in:
cheddar 2013-04-23 14:44:02 -05:00
parent 43d630c098
commit dde50a0d87
16 changed files with 267 additions and 708 deletions

View File

@ -167,10 +167,6 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.google.code.simple-spring-memcached</groupId> <groupId>com.google.code.simple-spring-memcached</groupId>
<artifactId>spymemcached</artifactId> <artifactId>spymemcached</artifactId>

View File

@ -30,6 +30,6 @@ public abstract class CuratorConfig
public abstract String getZkHosts(); public abstract String getZkHosts();
@Config("druid.zk.service.sessionTimeoutMs") @Config("druid.zk.service.sessionTimeoutMs")
@Default("15000") @Default("30000")
public abstract int getZkSessionTimeoutMs(); public abstract int getZkSessionTimeoutMs();
} }

View File

@ -20,16 +20,17 @@
package com.metamx.druid.initialization; package com.metamx.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.http.FileRequestLogger; import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.druid.zk.PropertiesZkSerializer;
import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory; import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.BoundedExponentialBackoffRetry; import com.netflix.curator.retry.BoundedExponentialBackoffRetry;
@ -37,16 +38,18 @@ import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder; import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
import com.netflix.curator.x.discovery.ServiceInstance; import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.ServiceProvider; import com.netflix.curator.x.discovery.ServiceProvider;
import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.data.Stat;
import org.I0Itec.zkclient.ZkConnection;
import org.mortbay.jetty.Connector; import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector; import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.thread.QueuedThreadPool; import org.mortbay.thread.QueuedThreadPool;
import org.skife.config.ConfigurationObjectFactory;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties; import java.util.Properties;
/** /**
@ -55,24 +58,23 @@ public class Initialization
{ {
private static final Logger log = new Logger(Initialization.class); private static final Logger log = new Logger(Initialization.class);
private static final String PROPERTIES_FILE = "runtime.properties";
private static final Properties zkProps = new Properties(); private static final Properties zkProps = new Properties();
private static final Properties fileProps = new Properties(zkProps); private static final Properties fileProps = new Properties(zkProps);
private static Properties props = null; private static Properties props = null;
public final static String PROP_SUBPATH = "properties";
public final static String[] SUB_PATHS = {"announcements", "servedSegments", "loadQueue", "master"};
public final static String[] SUB_PATH_PROPS = {
"druid.zk.paths.announcementsPath",
"druid.zk.paths.servedSegmentsPath",
"druid.zk.paths.loadQueuePath",
"druid.zk.paths.masterPath"
};
public static final String DEFAULT_ZPATH = "/druid";
/** /**
* Load properties. * Load properties.
* Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper. * Properties are layered:
*
* # stored in zookeeper
* # runtime.properties file,
* # cmdLine -D
*
* command line overrides runtime.properties which overrides zookeeper
*
* Idempotent. Thread-safe. Properties are only loaded once. * Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host=none then do not load properties from zookeeper. * If property druid.zk.service.host is not set then do not load properties from zookeeper.
* *
* @return Properties ready to use. * @return Properties ready to use.
*/ */
@ -88,13 +90,11 @@ public class Initialization
Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain
tmp_props.putAll(sp); tmp_props.putAll(sp);
final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties"); final InputStream stream = ClassLoader.getSystemResourceAsStream(PROPERTIES_FILE);
if (stream == null) { if (stream == null) {
log.info( log.info("%s not found on classpath, relying only on system properties and zookeeper.", PROPERTIES_FILE);
"runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."
);
} else { } else {
log.info("Loading properties from runtime.properties"); log.info("Loading properties from %s", PROPERTIES_FILE);
try { try {
try { try {
fileProps.load(stream); fileProps.load(stream);
@ -108,58 +108,46 @@ public class Initialization
} }
} }
// log properties from file; note stringPropertyNames() will follow Properties.defaults but // log properties from file; stringPropertyNames() would normally cascade down into the sub Properties objects, but
// next level is empty at this point. // zkProps (the parent level) is empty at this point so it will only log properties from runtime.properties
for (String prop : fileProps.stringPropertyNames()) { for (String prop : fileProps.stringPropertyNames()) {
log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop)); log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop));
} }
final String zk_hosts = tmp_props.getProperty("druid.zk.service.host"); final String zkHostsProperty = "druid.zk.service.host";
if (zk_hosts != null) { if (tmp_props.getProperty(zkHostsProperty) != null) {
if (!zk_hosts.equals("none")) { // get props from zk final ConfigurationObjectFactory factory = Config.createFactory(tmp_props);
final ZkClient zkPropLoadingClient;
final ZkClientConfig clientConfig = new ZkClientConfig()
{
@Override
public String getZkHosts()
{
return zk_hosts;
}
};
zkPropLoadingClient = new ZkClient( // TODO Lifecycle lifecycle = new Lifecycle();
new ZkConnection(clientConfig.getZkHosts()), try {
clientConfig.getConnectionTimeout(), final ZkPathsConfig config = factory.build(ZkPathsConfig.class);
new PropertiesZkSerializer() CuratorFramework curator = makeCuratorFramework(factory.build(CuratorConfig.class), lifecycle);
);
zkPropLoadingClient.waitUntilConnected(); lifecycle.start();
String propertiesZNodePath = tmp_props.getProperty("druid.zk.paths.propertiesPath");
if (propertiesZNodePath == null) { final Stat stat = curator.checkExists().forPath(config.getPropertiesPath());
String zpathBase = tmp_props.getProperty("druid.zk.paths.base", DEFAULT_ZPATH); if (stat != null) {
propertiesZNodePath = makePropPath(zpathBase); final byte[] data = curator.getData().forPath(config.getPropertiesPath());
} zkProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
// get properties stored by zookeeper (lowest precedence)
if (zkPropLoadingClient.exists(propertiesZNodePath)) {
Properties p = zkPropLoadingClient.readData(propertiesZNodePath, true);
if (p != null) {
zkProps.putAll(p);
}
} }
// log properties from zk // log properties from zk
for (String prop : zkProps.stringPropertyNames()) { for (String prop : zkProps.stringPropertyNames()) {
log.info("Loaded(properties stored in zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop)); log.info("Loaded(zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycle.stop();
} }
} // get props from zk
} else { } else {
log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination."); log.warn("property[%s] not set, skipping ZK-specified properties.", zkHostsProperty);
} }
// validate properties now that all levels of precedence are loaded
if (!validateResolveProps(tmp_props)) { props = tmp_props;
log.error("Properties failed to validate, cannot continue");
throw new RuntimeException("Properties failed to validate");
}
props = tmp_props; // publish
return props; return props;
} }
@ -191,6 +179,7 @@ public class Initialization
final CuratorFramework framework = final CuratorFramework framework =
CuratorFrameworkFactory.builder() CuratorFrameworkFactory.builder()
.connectString(curatorConfig.getZkHosts()) .connectString(curatorConfig.getZkHosts())
.sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
// Don't compress stuff written just yet, need to get code deployed first. // Don't compress stuff written just yet, need to get code deployed first.
.compressionProvider(new PotentiallyGzippedCompressionProvider(false)) .compressionProvider(new PotentiallyGzippedCompressionProvider(false))
@ -308,129 +297,4 @@ public class Initialization
new File(PropUtils.getProperty(props, "druid.request.logging.dir")) new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
); );
} }
public static RequestLogger makeEmittingRequestLogger(Properties props, Emitter emitter)
{
return new EmittingRequestLogger(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
emitter,
PropUtils.getProperty(props, "druid.request.logging.feed")
);
}
public static String makePropPath(String basePath)
{
return String.format("%s/%s", basePath, PROP_SUBPATH);
}
public static String addressFromHost(final String host)
{
final int colon = host.indexOf(':');
if (colon < 0) {
return host;
} else {
return host.substring(0, colon);
}
}
/**
* Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
* Check validity so that if druid.zk.paths.*Path props are set, all are set,
* if none set, then construct defaults relative to druid.zk.paths.base and add these
* to the properties chain.
*
* @param props
*
* @return true if valid zpath properties.
*/
public static boolean validateResolveProps(Properties props)
{
boolean zpathValidateFailed;// validate druid.zk.paths.base
String propertyZpath = props.getProperty("druid.zk.paths.base");
zpathValidateFailed = zpathBaseCheck(propertyZpath, "property druid.zk.paths.base");
String zpathEffective = DEFAULT_ZPATH;
if (propertyZpath != null) {
zpathEffective = propertyZpath;
}
final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath");
if (!zpathValidateFailed) {
System.out.println("Effective zpath prefix=" + zpathEffective);
}
// validate druid.zk.paths.*Path properties
//
// if any zpath overrides are set in properties, they must start with /
int zpathOverrideCount = 0;
StringBuilder sbErrors = new StringBuilder(100);
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
String val = props.getProperty(SUB_PATH_PROPS[i]);
if (val != null) {
zpathOverrideCount++;
if (!val.startsWith("/")) {
sbErrors.append(SUB_PATH_PROPS[i]).append("=").append(val).append("\n");
zpathValidateFailed = true;
}
}
}
// separately check druid.zk.paths.propertiesPath (not in SUB_PATH_PROPS since it is not a "dir")
if (propertiesZpathOverride != null) {
zpathOverrideCount++;
if (!propertiesZpathOverride.startsWith("/")) {
sbErrors.append("druid.zk.paths.propertiesPath").append("=").append(propertiesZpathOverride).append("\n");
zpathValidateFailed = true;
}
}
if (zpathOverrideCount == 0) {
if (propertyZpath == null) { // if default base is used, store it as documentation
props.setProperty("druid.zk.paths.base", zpathEffective);
}
//
// Resolve default zpaths using zpathEffective as base
//
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
props.setProperty(SUB_PATH_PROPS[i], zpathEffective + "/" + SUB_PATHS[i]);
}
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
}
if (zpathValidateFailed) {
System.err.println(
"When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
"the znode path must start with '/' (slash) ; problem overrides:"
);
System.err.print(sbErrors.toString());
}
return !zpathValidateFailed;
}
/**
* Check znode zpath base for proper slash, no trailing slash.
*
* @param zpathBase znode base path, if null then this method does nothing.
* @param errorMsgPrefix error context to use if errors are emitted, should indicate
* where the zpathBase value came from.
*
* @return true if validate failed.
*/
public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix)
{
boolean zpathValidateFailed = false;
if (zpathBase != null) {
if (!zpathBase.startsWith("/")) {
zpathValidateFailed = true;
System.err.println(errorMsgPrefix + " must start with '/' (slash); found=" + zpathBase);
}
if (zpathBase.endsWith("/")) {
zpathValidateFailed = true;
System.err.println(errorMsgPrefix + " must NOT end with '/' (slash); found=" + zpathBase);
}
}
return zpathValidateFailed;
}
} }

View File

@ -23,14 +23,45 @@ import org.skife.config.Config;
/** /**
*/ */
public abstract class ZkClientConfig public abstract class ZkPathsConfig
{ {
@Config("druid.zk.service.host") @Config("druid.zk.paths.base")
public abstract String getZkHosts(); protected String getZkBasePath()
@Config("druid.zk.service.connectionTimeout")
public int getConnectionTimeout()
{ {
return Integer.MAX_VALUE; return "/druid";
}
@Config("druid.zk.paths.propertiesPath")
public String getPropertiesPath()
{
return defaultPath("properties");
}
@Config("druid.zk.paths.announcementsPath")
public String getAnnouncementsPath()
{
return defaultPath("announcements");
}
@Config("druid.zk.paths.servedSegmentsPath")
public String getServedSegmentsPath()
{
return defaultPath("servedSegments");
}
@Config("druid.zk.paths.loadQueuePath")
public String getLoadQueuePath()
{
return defaultPath("loadQueue");
}
@Config("druid.zk.paths.masterPath")
public String getMasterPath()
{
return defaultPath("master");
}
private String defaultPath(final String subPath) {
return String.format("%s/%s", getZkBasePath(), subPath);
} }
} }

View File

@ -1,68 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.zk;
import com.metamx.common.IAE;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.TimeZone;
/**
*/
public class PropertiesZkSerializer implements ZkSerializer
{
private static final SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss'z'");
static {
df.setTimeZone(TimeZone.getTimeZone("UTC"));
}
public final static String META_PROP = "__MODIFIED";
@Override
public byte[] serialize(Object data) throws ZkMarshallingError
{
if (data instanceof Properties) {
final Properties props = (Properties) data;
ByteArrayOutputStream bos = new ByteArrayOutputStream(props.size()*60 + 30);
try {
final String ts = df.format(new Date());
props.setProperty("__MODIFIED", ts);
props.store(bos, "Druid");
} catch (IOException ignored) { }
return bos.toByteArray();
}
throw new IAE("Can only serialize Properties into ZK");
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError
{
final Properties props = new Properties();
try {
props.load(new ByteArrayInputStream(bytes));
} catch (IOException ignored) {
}
return props;
}
}

View File

@ -1,48 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.zk;
import com.metamx.common.IAE;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.nio.charset.Charset;
/**
*/
public class StringZkSerializer implements ZkSerializer
{
private static final Charset UTF8 = Charset.forName("UTF8");
@Override
public byte[] serialize(Object data) throws ZkMarshallingError
{
if (data instanceof String) {
return ((String) data).getBytes(UTF8);
}
throw new IAE("Can only serialize strings into ZK");
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError
{
return new String(bytes, UTF8);
}
}

View File

@ -122,10 +122,6 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -137,10 +137,6 @@
<artifactId>twitter4j-stream</artifactId> <artifactId>twitter4j-stream</artifactId>
<version>2.2.6</version> <version>2.2.6</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -98,10 +98,6 @@
<groupId>com.google.code.findbugs</groupId> <groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency> <dependency>
<groupId>net.java.dev.jets3t</groupId> <groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId> <artifactId>jets3t</artifactId>

View File

@ -156,10 +156,6 @@
<groupId>com.google.code.findbugs</groupId> <groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -279,11 +279,6 @@
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version> <version>1.6.2</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.code.simple-spring-memcached</groupId> <groupId>com.google.code.simple-spring-memcached</groupId>
<artifactId>spymemcached</artifactId> <artifactId>spymemcached</artifactId>

View File

@ -120,16 +120,6 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them --> <!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency> <dependency>

View File

@ -168,10 +168,6 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<!-- Dependencies required for jets3t b/c emr pom doesn't include them --> <!-- Dependencies required for jets3t b/c emr pom doesn't include them -->
<dependency> <dependency>

View File

@ -136,28 +136,26 @@ public class MasterMain
lifecycle.addManagedInstance(serverInventoryThingie); lifecycle.addManagedInstance(serverInventoryThingie);
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
final DatabaseRuleManagerConfig databaseRuleManagerConfig = configFactory.build(DatabaseRuleManagerConfig.class);
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
DbConnector.createSegmentTable(dbi, PropUtils.getProperty(props, "druid.database.segmentTable")); DbConnector.createSegmentTable(dbi, PropUtils.getProperty(props, "druid.database.segmentTable"));
DbConnector.createRuleTable(dbi, PropUtils.getProperty(props, "druid.database.ruleTable")); DbConnector.createRuleTable(dbi, PropUtils.getProperty(props, "druid.database.ruleTable"));
DatabaseRuleManager.createDefaultRule(
dbi, databaseRuleManagerConfig.getRuleTable(), databaseRuleManagerConfig.getDefaultDatasource(), jsonMapper
);
final DatabaseSegmentManager databaseSegmentManager = new DatabaseSegmentManager( final DatabaseSegmentManager databaseSegmentManager = new DatabaseSegmentManager(
jsonMapper, jsonMapper,
scheduledExecutorFactory.create(1, "DatabaseSegmentManager-Exec--%d"), scheduledExecutorFactory.create(1, "DatabaseSegmentManager-Exec--%d"),
configFactory.build(DatabaseSegmentManagerConfig.class), configFactory.build(DatabaseSegmentManagerConfig.class),
dbi dbi
); );
final DatabaseRuleManagerConfig databaseRuleManagerConfig = configFactory.build(DatabaseRuleManagerConfig.class);
final DatabaseRuleManager databaseRuleManager = new DatabaseRuleManager( final DatabaseRuleManager databaseRuleManager = new DatabaseRuleManager(
jsonMapper, jsonMapper,
scheduledExecutorFactory.create(1, "DatabaseRuleManager-Exec--%d"), scheduledExecutorFactory.create(1, "DatabaseRuleManager-Exec--%d"),
databaseRuleManagerConfig, databaseRuleManagerConfig,
dbi dbi
); );
DatabaseRuleManager.createDefaultRule(
dbi,
databaseRuleManagerConfig.getRuleTable(),
databaseRuleManagerConfig.getDefaultDatasource(),
jsonMapper
);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler healthMonitor = new MonitorScheduler( final MonitorScheduler healthMonitor = new MonitorScheduler(

View File

@ -19,19 +19,28 @@
package com.metamx.druid.utils; package com.metamx.druid.utils;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.logger.Logger; import com.metamx.common.config.Config;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.db.DbConnector; import com.netflix.curator.framework.CuratorFramework;
import com.metamx.druid.db.DbConnectorConfig; import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.metamx.druid.initialization.Initialization; import com.netflix.curator.retry.RetryOneTime;
import com.metamx.druid.jackson.DefaultObjectMapper; import org.skife.config.ConfigurationObjectFactory;
import com.metamx.druid.zk.PropertiesZkSerializer;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.io.*; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
/** /**
* Set up the shared Druid ensemble space. * Set up the shared Druid ensemble space.
@ -78,12 +87,14 @@ import java.util.Properties;
*/ */
public class DruidSetup public class DruidSetup
{ {
private static final Logger log = new Logger(DruidSetup.class); private final static String MODIFIED_PROP = "__MODIFIED";
private final static Set<String> IGNORED_PROPS = Sets.newHashSet(MODIFIED_PROP);
public static void main(final String[] args) public static void main(final String[] args)
{ {
ZkClient zkClient = null; CuratorFramework curator = null;
try {
if (args.length < 2 || args.length > 3) { if (args.length < 2 || args.length > 3) {
printUsage(); printUsage();
System.exit(1); System.exit(1);
@ -91,24 +102,21 @@ public class DruidSetup
String cmd = args[0]; String cmd = args[0];
if ("dump".equals(cmd) && args.length == 3) { if ("dump".equals(cmd) && args.length == 3) {
final String zkConnect = args[1]; final String zkConnect = args[1];
zkClient = connectToZK(zkConnect); curator = connectToZK(zkConnect);
String zpathBase = args[2]; String zpathBase = args[2];
dumpFromZk(zkClient, zpathBase, zkConnect, System.out); dumpFromZk(curator, zkConnect, zpathBase, System.out);
} else if ("put".equals(cmd) && args.length == 3) { } else if ("put".equals(cmd) && args.length == 3) {
final String zkConnect = args[1]; final String zkConnect = args[1];
zkClient = connectToZK(zkConnect); curator = connectToZK(zkConnect);
final String pfile = args[2]; final String pfile = args[2];
putToZk(zkClient, pfile); putToZk(curator, pfile);
} else if ("dbprep".equals(cmd) && args.length == 2) {
final String pfile = args[1];
prepDB(pfile);
} else { } else {
printUsage(); printUsage();
System.exit(1); System.exit(1);
} }
}
if (zkClient != null) { finally {
zkClient.close(); Closeables.closeQuietly(curator);
} }
} }
@ -118,12 +126,8 @@ public class DruidSetup
* This can only be used for setup, not service run time because of some assembly here. * This can only be used for setup, not service run time because of some assembly here.
* *
* @param pfile path to runtime.properties file to be read. * @param pfile path to runtime.properties file to be read.
* @param props Properties object to fill, props like druid.zk.paths.*Path will always be set after
* this method either because the input file has them set (overrides) or because prop
* druid.zk.paths.base was used as a prefix to construct the default zpaths;
* druid.zk.paths.base will be set iff there is a single base for all zpaths
*/ */
private static void loadProperties(String pfile, Properties props) private static Properties loadProperties(String pfile)
{ {
InputStream is = null; InputStream is = null;
try { try {
@ -134,100 +138,88 @@ public class DruidSetup
System.err.println("No changes made."); System.err.println("No changes made.");
System.exit(4); System.exit(4);
} }
catch (IOException ioe) {
reportErrorAndExit(pfile, ioe);
}
try { try {
props.load(is); Properties props = new Properties();
props.load(new InputStreamReader(is, Charsets.UTF_8));
return props;
} }
catch (IOException e) { catch (IOException e) {
reportErrorAndExit(pfile, e); throw reportErrorAndExit(pfile, e);
} }
finally { finally {
Closeables.closeQuietly(is); Closeables.closeQuietly(is);
} }
if (!Initialization.validateResolveProps(props)) { // bail, errors have been emitted
System.exit(9);
}
// emit effective zpaths to be used
System.out.println("Effective zpath properties:");
for (String pname : Initialization.SUB_PATH_PROPS) {
System.out.println(" " + pname + "=" + props.getProperty(pname));
}
System.out.println(
" " + "druid.zk.paths.propertiesPath" + "=" +
props.getProperty("druid.zk.paths.propertiesPath")
);
} }
/** /**
* @param zkClient zookeeper client. * @param curator zookeeper client.
* @param zpathBase znode base path. * @param zPathBase znode base path.
* @param zkConnect ZK coordinates in the form host1:port1[,host2:port2[, ...]] * @param zkConnect ZK coordinates in the form host1:port1[,host2:port2[, ...]]
* @param out * @param out
*/ */
private static void dumpFromZk(ZkClient zkClient, String zpathBase, String zkConnect, PrintStream out) private static void dumpFromZk(CuratorFramework curator, String zkConnect, final String zPathBase, PrintStream out)
{ {
final String propPath = Initialization.makePropPath(zpathBase); ZkPathsConfig config = new ZkPathsConfig()
if (zkClient.exists(propPath)) { {
Properties currProps = zkClient.readData(propPath, true); @Override
if (currProps != null) { protected String getZkBasePath()
out.println("# Begin Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath); {
return zPathBase;
}
};
try { try {
currProps.store(out, "Druid"); if (curator.checkExists().forPath(config.getPropertiesPath()) != null) {
byte[] data = curator.getData().forPath(config.getPropertiesPath());
Properties currProps = new Properties();
currProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
if (! currProps.isEmpty()) {
out.printf("# Begin Properties Listing for zpath[%s]%n", config.getPropertiesPath());
try {
currProps.store(new OutputStreamWriter(out, Charsets.UTF_8), "Druid");
} }
catch (IOException ignored) { catch (IOException ignored) {
} }
out.println("# End Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath); out.printf("# End Properties for zkConnect[%s] zpath[%s]%n", zkConnect, config.getPropertiesPath());
out.println("# NOTE: properties like druid.zk.paths.*Path are always stored in zookeeper in absolute form."); }
out.println(); else {
out.printf("# Properties at zpath[%s] empty.%n", config.getPropertiesPath());
} }
} }
//out.println("Zookeeper znodes and zpaths for " + zkConnect + " (showing all zpaths)"); }
// list all znodes catch (Exception e) {
// (not ideal since recursive listing starts at / instead of at baseZkPath) throw Throwables.propagate(e);
//zkClient.showFolders(out); }
} }
/** private static void putToZk(CuratorFramework curator, String pfile)
* @param zkClient zookeeper client.
* @param pfile
*/
private static void putToZk(ZkClient zkClient, String pfile)
{ {
Properties props = new Properties(); final Properties props = loadProperties(pfile);
loadProperties(pfile, props); ConfigurationObjectFactory configFactory = Config.createFactory(props);
String zpathBase = props.getProperty("druid.zk.paths.base"); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class);
// create znodes first createZNodes(curator, zkPaths, System.out);
// updatePropertiesZK(curator, zkPaths, props, System.out);
createZNodes(zkClient, zpathBase, System.out);
// put props
//
updatePropertiesZK(zkClient, zpathBase, props, System.out);
} }
/** /**
* @param zkClient zookeeper client. * @param curator zookeeper client.
* @param zpathBase znode base path. * @param zkPaths znode base path.
* @param props the properties to store. * @param props the properties to store.
* @param out the PrintStream for human readable update summary (usually System.out). * @param out the PrintStream for human readable update summary (usually System.out).
*/ */
private static void updatePropertiesZK(ZkClient zkClient, String zpathBase, Properties props, PrintStream out) private static void updatePropertiesZK(CuratorFramework curator, ZkPathsConfig zkPaths, Properties props, PrintStream out)
{ {
final String propPathOverride = props.getProperty("druid.zk.paths.propertiesPath"); Properties currProps = new Properties();
final String propPathConstructed = Initialization.makePropPath(zpathBase); try {
final String propPath = (propPathOverride != null) ? propPathOverride : propPathConstructed; if (curator.checkExists().forPath(zkPaths.getPropertiesPath()) != null) {
Properties currProps = null; final byte[] data = curator.getData().forPath(zkPaths.getPropertiesPath());
if (zkClient.exists(propPath)) { currProps.load(new InputStreamReader(new ByteArrayInputStream(data), Charsets.UTF_8));
currProps = zkClient.readData(propPath, true);
} }
boolean propsDiffer = false; boolean propsDiffer = false;
if (currProps == null) { if (currProps.isEmpty()) {
out.println("No properties currently stored in zk"); out.println("No properties currently stored in zk");
propsDiffer = true; propsDiffer = true;
} else { // determine whether anything is different } else { // determine whether anything is different
@ -235,11 +227,10 @@ public class DruidSetup
int countDiffer = 0; int countDiffer = 0;
int countRemoved = 0; int countRemoved = 0;
int countNoChange = 0; int countNoChange = 0;
String currMetaPropVal = "";
StringBuilder changes = new StringBuilder(1024); StringBuilder changes = new StringBuilder(1024);
for (String pname : props.stringPropertyNames()) { for (String pname : props.stringPropertyNames()) {
if (pname.equals(PropertiesZkSerializer.META_PROP)) { if (IGNORED_PROPS.contains(pname)) {
continue; // ignore meta prop datestamp, if any continue; // ignore meta props, if any
} }
final String pvalue = props.getProperty(pname); final String pvalue = props.getProperty(pname);
final String pvalueCurr = currProps.getProperty(pname); final String pvalueCurr = currProps.getProperty(pname);
@ -250,156 +241,92 @@ public class DruidSetup
countNoChange++; countNoChange++;
} else { } else {
countDiffer++; countDiffer++;
changes.append("CHANGED: ").append(pname).append("= PREV=").append(pvalueCurr) changes.append(String.format("CHANGED[%s]: PREV=%s --- NOW=%s%n", pname, pvalueCurr, pvalue));
.append(" NOW=").append(pvalue).append("\n");
} }
} }
} }
for (String pname : currProps.stringPropertyNames()) { for (String pname : currProps.stringPropertyNames()) {
if (pname.equals(PropertiesZkSerializer.META_PROP)) { if (IGNORED_PROPS.contains(pname)) {
currMetaPropVal = currProps.getProperty(pname); continue; // ignore meta props, if any
continue; // ignore meta prop datestamp
} }
if (props.getProperty(pname) == null) { if (props.getProperty(pname) == null) {
countRemoved++; countRemoved++;
changes.append("REMOVED: ").append(pname).append("=").append(currProps.getProperty(pname)).append("\n"); changes.append(String.format("REMOVED: %s=%s%n", pname, currProps.getProperty(pname)));
} }
} }
if (countNew + countRemoved + countDiffer > 0) { if (countNew + countRemoved + countDiffer > 0) {
out.println( out.printf(
"Current properties differ: " "Properties differ: %,d new, %,d changed, %,d removed, %,d unchanged, previously updated %s%n",
+ countNew + " new, " countNew, countDiffer, countRemoved, countNoChange, currProps.getProperty(MODIFIED_PROP)
+ countDiffer + " different values, "
+ countRemoved + " removed, "
+ countNoChange + " unchanged, "
+ currMetaPropVal + " previously updated"
); );
out.println(changes); out.println(changes);
propsDiffer = true; propsDiffer = true;
} else { } else {
out.println("Current properties identical to file given, entry count=" + countNoChange); out.printf("Current properties identical to file given, %,d total properties set.%n", countNoChange);
} }
} }
if (propsDiffer) { if (propsDiffer) {
if (currProps != null) { ByteArrayOutputStream propsBytes = new ByteArrayOutputStream();
zkClient.delete(propPath); props.store(new OutputStreamWriter(propsBytes, Charsets.UTF_8), "Common Druid properties");
if (currProps.isEmpty()) {
curator.setData().forPath(zkPaths.getPropertiesPath(), propsBytes.toByteArray());
} }
// update zookeeper else {
zkClient.createPersistent(propPath, props); curator.create().forPath(zkPaths.getPropertiesPath(), propsBytes.toByteArray());
out.println("Properties updated, entry count=" + props.size()); }
out.printf("Properties updated, %,d total properties set.%n", props.size());
}
}
catch (Exception e) {
throw Throwables.propagate(e);
} }
} }
/** /**
* @param zkClient zookeeper client. * @param curator zookeeper client.
* @param zpathBase znode base path. * @param zkPaths znode base path.
* @param out the PrintStream for human readable update summary. * @param out the PrintStream for human readable update summary.
*/ */
private static void createZNodes(ZkClient zkClient, String zpathBase, PrintStream out) private static void createZNodes(CuratorFramework curator, ZkPathsConfig zkPaths, PrintStream out)
{ {
zkClient.createPersistent(zpathBase, true); createPath(curator, zkPaths.getAnnouncementsPath(), out);
for (String subPath : Initialization.SUB_PATHS) { createPath(curator, zkPaths.getMasterPath(), out);
final String thePath = String.format("%s/%s", zpathBase, subPath); createPath(curator, zkPaths.getLoadQueuePath(), out);
if (zkClient.exists(thePath)) { createPath(curator, zkPaths.getServedSegmentsPath(), out);
}
private static void createPath(CuratorFramework curator, String thePath, PrintStream out)
{
try {
if (curator.checkExists().forPath(thePath) != null) {
out.printf("Path[%s] exists already%n", thePath); out.printf("Path[%s] exists already%n", thePath);
} else { } else {
out.printf("Creating ZK path[%s]%n", thePath); out.printf("Creating ZK path[%s]%n", thePath);
zkClient.createPersistent(thePath, true); curator.create().creatingParentsIfNeeded().forPath(thePath);
} }
} }
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
private static void reportErrorAndExit(String pfile, IOException ioe) private static RuntimeException reportErrorAndExit(String pfile, IOException ioe)
{ {
System.err.println("Could not read file: " + pfile); System.err.println("Could not read file: " + pfile);
System.err.println(" because of: " + ioe); System.err.println(" because of: " + ioe);
System.err.println("No changes made."); System.err.println("No changes made.");
System.exit(4); System.exit(4);
return new RuntimeException();
} }
private static ZkClient connectToZK(String zkConnect) private static CuratorFramework connectToZK(String zkConnect)
{ {
return new ZkClient( return CuratorFrameworkFactory.builder()
new ZkConnection(zkConnect), .connectString(zkConnect)
Integer.MAX_VALUE, .retryPolicy(new RetryOneTime(5000))
new PropertiesZkSerializer() .build();
);
}
/**
* Connect to db and create table, if it does not exist.
* NOTE: Connection failure only shows up in log output.
*
* @param pfile path to properties file to use.
*/
private static void prepDB(final String pfile)
{
Properties tmp_props = new Properties();
loadProperties(pfile, tmp_props);
final String tableName = tmp_props.getProperty("druid.database.segmentTable", "prod_segments");
final String ruleTableName = tmp_props.getProperty("druid.database.ruleTable", "prod_rules");
final String dbConnectionUrl = tmp_props.getProperty("druid.database.connectURI");
final String username = tmp_props.getProperty("druid.database.user");
final String password = tmp_props.getProperty("druid.database.password");
final String defaultDatasource = tmp_props.getProperty("druid.database.defaultDatasource", "_default");
//
// validation
//
if (tableName.length() == 0 || !Character.isLetter(tableName.charAt(0))) {
throw new RuntimeException("poorly formed property druid.database.segmentTable=" + tableName);
}
if (ruleTableName.length() == 0 || !Character.isLetter(ruleTableName.charAt(0))) {
throw new RuntimeException("poorly formed property druid.database.ruleTable=" + ruleTableName);
}
if (username == null || username.length() == 0) {
throw new RuntimeException("poorly formed property druid.database.user=" + username);
}
if (password == null || password.length() == 0) {
throw new RuntimeException("poorly formed property druid.database.password=" + password);
}
if (dbConnectionUrl == null || dbConnectionUrl.length() == 0) {
throw new RuntimeException("poorly formed property druid.database.connectURI=" + dbConnectionUrl);
}
final DbConnectorConfig config = new DbConnectorConfig()
{
@Override
public String getDatabaseConnectURI()
{
return dbConnectionUrl;
}
@Override
public String getDatabaseUser()
{
return username;
}
@Override
public String getDatabasePassword()
{
return password;
}
@Override
public String getSegmentTable()
{
return tableName;
}
};
DbConnector dbConnector = new DbConnector(config);
DbConnector.createSegmentTable(dbConnector.getDBI(), tableName);
DbConnector.createRuleTable(dbConnector.getDBI(), ruleTableName);
DatabaseRuleManager.createDefaultRule(
dbConnector.getDBI(),
ruleTableName,
defaultDatasource,
new DefaultObjectMapper()
);
} }
/** /**
@ -412,7 +339,6 @@ public class DruidSetup
+ " Where CMD is a particular command:\n" + " Where CMD is a particular command:\n"
+ " CMD choices:\n" + " CMD choices:\n"
+ " dump zkConnect baseZkPath # dump info from zk at given coordinates\n" + " dump zkConnect baseZkPath # dump info from zk at given coordinates\n"
+ " dbprep propfile # create metadata table in db\n"
+ " put zkConnect propfile # store paths and propfile into zk at given coordinates\n" + " put zkConnect propfile # store paths and propfile into zk at given coordinates\n"
+ " args:\n" + " args:\n"
+ " zkConnect: ZK coordinates in the form host1:port1[,host2:port2[, ...]]\n" + " zkConnect: ZK coordinates in the form host1:port1[,host2:port2[, ...]]\n"

View File

@ -1,105 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.utils;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.zk.StringZkSerializer;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
/**
* @deprecated see DruidSetup
*/
public class ZkSetup
{
public static void main(final String[] args)
{
if (args.length != 5) {
System.out.println("Usage: <java invocation> zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName");
System.out.println("This utility is deprecated, see DruidSetup instead.");
System.exit(1);
}
String path = args[1];
final ZkClient zkClient = new ZkClient(
new ZkConnection(args[0]),
Integer.MAX_VALUE,
new StringZkSerializer()
);
zkClient.createPersistent(path, true);
for (String subPath : Initialization.SUB_PATHS) {
final String thePath = String.format("%s/%s", path, subPath);
if (zkClient.exists(thePath)) {
System.out.printf("Path[%s] exists already%n", thePath);
} else {
System.out.printf("Creating ZK path[%s]%n", thePath);
zkClient.createPersistent(thePath);
}
}
final DbConnectorConfig config = new DbConnectorConfig()
{
private final String username;
private final String password;
{
String[] splitArgs = args[3].split(":");
username = splitArgs[0];
if (splitArgs.length > 1) {
password = splitArgs[1];
} else {
password = "";
}
}
@Override
public String getDatabaseConnectURI()
{
return args[2];
}
@Override
public String getDatabaseUser()
{
return username;
}
@Override
public String getDatabasePassword()
{
return password;
}
@Override
public String getSegmentTable()
{
return args[4];
}
};
DbConnector dbConnector = new DbConnector(config);
DbConnector.createSegmentTable(dbConnector.getDBI(), args[4]);
}
}