Merge branch 'master' of git://github.com/tralfamadude/druid-1

Added DruidSetup.java and install/druid_setup.sh to run it for Druid ensemble setup (put properties to zk, create zk zpaths, prep db, and dump properties/paths in zk); property druid.zk.paths.base can establish a namespace for a druid ensemble allowing more than one ensemble to use the same zookeepers; this namespace is the base zpath prefix for paths with properties in the form druid.zk.paths.*Path which are normally set automatically now; if druid.zk.paths.*Path are explicitly set, then all of them must be set with absolute paths to ensure careful attention; ZkSetup is now deprecated, use DruidSetup instead; examples/twitter RealtimeStandaloneMain will exit properly now after a kill -15 (control-c or kill pid) instead of hanging on an unstopped daemon.
This commit is contained in:
Eric Tschetter 2012-11-06 20:15:57 -08:00
commit b1475219a9
13 changed files with 906 additions and 43 deletions

7
README
View File

@ -1 +1,6 @@
See the "Wiki":https://github.com/metamx/druid/wiki See the "Wiki" https://github.com/metamx/druid/wiki
Build with build.sh
See examples/rand
See examples/twitter

View File

@ -31,6 +31,7 @@ import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.druid.zk.StringZkSerializer; import com.metamx.druid.zk.StringZkSerializer;
import com.metamx.druid.zk.PropertiesZkSerializer;
import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory; import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.ExponentialBackoffRetry; import com.netflix.curator.retry.ExponentialBackoffRetry;
@ -59,7 +60,17 @@ public class Initialization
{ {
private static final Logger log = new Logger(Initialization.class); private static final Logger log = new Logger(Initialization.class);
private static volatile Properties props = null; private static final Properties zkProps = new Properties();
private static final Properties fileProps = new Properties(zkProps);
private static Properties props = null;
public final static String PROP_SUBPATH = "properties";
public final static String[] SUB_PATHS = {"announcements", "servedSegments", "loadQueue", "master"};
public final static String[] SUB_PATH_PROPS = {
"druid.zk.paths.announcementsPath",
"druid.zk.paths.servedSegmentsPath",
"druid.zk.paths.loadQueuePath",
"druid.zk.paths.masterPath"};
public static final String DEFAULT_ZPATH = "/druid";
public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle) public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle)
{ {
@ -107,23 +118,33 @@ public class Initialization
); );
} }
public static Properties loadProperties()
/** Load properties.
* Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper.
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host=none then do not load properties from zookeeper.
* @return Properties ready to use.
*/
public synchronized static Properties loadProperties()
{ {
if (props != null) { if (props != null) {
return props; return props;
} }
Properties loadedProps = null; // Note that zookeeper coordinates must be either in cmdLine or in runtime.properties
Properties sp = System.getProperties();
Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain
tmp_props.putAll(sp);
final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties"); final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties");
if (stream == null) { if (stream == null) {
log.info("runtime.properties didn't exist as a resource, loading system properties instead."); log.info("runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now.");
loadedProps = System.getProperties();
} else { } else {
log.info("Loading properties from runtime.properties."); log.info("Loading properties from runtime.properties");
try { try {
loadedProps = new Properties();
try { try {
loadedProps.load(stream); fileProps.load(stream);
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -134,13 +155,60 @@ public class Initialization
} }
} }
for (String prop : loadedProps.stringPropertyNames()) { // log properties from file; note stringPropertyNames() will follow Properties.defaults but
log.info("Loaded Property[%s] as [%s]", prop, loadedProps.getProperty(prop)); // next level is empty at this point.
for (String prop : fileProps.stringPropertyNames()) {
log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop));
} }
props = loadedProps; final String zk_hosts = tmp_props.getProperty("druid.zk.service.host");
return loadedProps; if (zk_hosts != null) {
if (!zk_hosts.equals("none")) { // get props from zk
final ZkClient zkPropLoadingClient;
final ZkClientConfig clientConfig = new ZkClientConfig()
{
@Override
public String getZkHosts()
{
return zk_hosts;
}
};
zkPropLoadingClient = new ZkClient(
new ZkConnection(clientConfig.getZkHosts()),
clientConfig.getConnectionTimeout(),
new PropertiesZkSerializer()
);
zkPropLoadingClient.waitUntilConnected();
String propertiesZNodePath = tmp_props.getProperty("druid.zk.paths.propertiesPath");
if (propertiesZNodePath == null) {
String zpathBase = tmp_props.getProperty("druid.zk.paths.base", DEFAULT_ZPATH);
propertiesZNodePath = makePropPath(zpathBase);
}
// get properties stored by zookeeper (lowest precedence)
if (zkPropLoadingClient.exists(propertiesZNodePath)) {
Properties p = zkPropLoadingClient.readData(propertiesZNodePath, true);
if (p != null) {
zkProps.putAll(p);
}
}
// log properties from zk
for (String prop : zkProps.stringPropertyNames()) {
log.info("Loaded(properties stored in zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
}
} // get props from zk
} else { // ToDo: should this be an error?
log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination.");
}
// validate properties now that all levels of precedence are loaded
if (! validateResolveProps(tmp_props)) {
log.error("Properties failed to validate, cannot continue");
throw new RuntimeException("Properties failed to validate");
}
props = tmp_props; // publish
return props;
} }
public static Server makeJettyServer(ServerConfig config) public static Server makeJettyServer(ServerConfig config)
@ -279,4 +347,116 @@ public class Initialization
new File(PropUtils.getProperty(props, "druid.request.logging.dir")) new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
); );
} }
public static String makePropPath(String basePath)
{
return String.format("%s/%s", basePath, PROP_SUBPATH);
}
/** Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
* Check validity so that if druid.zk.paths.*Path props are set, all are set,
* if none set, then construct defaults relative to druid.zk.paths.base and add these
* to the properties chain.
* @param props
* @return true if valid zpath properties.
*/
public static boolean validateResolveProps(Properties props)
{
boolean zpathValidateFailed;// validate druid.zk.paths.base
String propertyZpath = props.getProperty("druid.zk.paths.base");
zpathValidateFailed = zpathBaseCheck(propertyZpath, "property druid.zk.paths.base");
String zpathEffective = DEFAULT_ZPATH;
if (propertyZpath != null) {
zpathEffective = propertyZpath;
}
final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath");
if (!zpathValidateFailed) System.out.println("Effective zpath prefix=" + zpathEffective);
// validate druid.zk.paths.*Path properties
//
// if any zpath overrides are set in properties, all must be set, and they must start with /
int zpathOverrideCount = 0;
boolean zpathOverridesNotAbs = false;
StringBuilder sbErrors = new StringBuilder(100);
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
String val = props.getProperty(SUB_PATH_PROPS[i]);
if (val != null) {
zpathOverrideCount++;
if (!val.startsWith("/")) {
zpathOverridesNotAbs = true;
sbErrors.append(SUB_PATH_PROPS[i]).append("=").append(val).append("\n");
zpathValidateFailed = true;
}
}
}
// separately check druid.zk.paths.propertiesPath (not in SUB_PATH_PROPS since it is not a "dir")
if (propertiesZpathOverride != null) {
zpathOverrideCount++;
if (!propertiesZpathOverride.startsWith("/")) {
zpathOverridesNotAbs = true;
sbErrors.append("druid.zk.paths.propertiesPath").append("=").append(propertiesZpathOverride).append("\n");
zpathValidateFailed = true;
}
}
if (zpathOverridesNotAbs) {
System.err.println("When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
"the znode path must start with '/' (slash) ; problem overrides:");
System.err.print(sbErrors.toString());
}
if (zpathOverrideCount > 0) {
if (zpathOverrideCount < SUB_PATH_PROPS.length + 1) {
zpathValidateFailed = true;
System.err.println("When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
"all must be overridden together; missing overrides:");
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
String val = props.getProperty(SUB_PATH_PROPS[i]);
if (val == null) {
System.err.println(" " + SUB_PATH_PROPS[i]);
}
}
if (propertiesZpathOverride == null) System.err.println(" " + "druid.zk.paths.propertiesPath");
} else { // proper overrides
// do not prefix with property druid.zk.paths.base
; // fallthru
}
} else { // no overrides
if (propertyZpath == null) { // if default base is used, store it as documentation
props.setProperty("druid.zk.paths.base", zpathEffective);
}
//
// Resolve default zpaths using zpathEffective as base
//
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
props.setProperty(SUB_PATH_PROPS[i], zpathEffective + "/" + SUB_PATHS[i]);
}
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
}
return ! zpathValidateFailed;
}
/** Check znode zpath base for proper slash, no trailing slash.
* @param zpathBase znode base path, if null then this method does nothing.
* @param errorMsgPrefix error context to use if errors are emitted, should indicate
* where the zpathBase value came from.
* @return true if validate failed.
*/
public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix)
{
boolean zpathValidateFailed = false;
if (zpathBase != null) {
if (!zpathBase.startsWith("/")) {
zpathValidateFailed = true;
System.err.println(errorMsgPrefix + " must start with '/' (slash); found=" + zpathBase);
}
if (zpathBase.endsWith("/")) {
zpathValidateFailed = true;
System.err.println(errorMsgPrefix + " must NOT end with '/' (slash); found=" + zpathBase);
}
}
return zpathValidateFailed;
}
} }

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.zk;
import com.metamx.common.IAE;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.*;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.TimeZone;
/**
*/
public class PropertiesZkSerializer implements ZkSerializer
{
private static final SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss'z'");
static {
df.setTimeZone(TimeZone.getTimeZone("UTC"));
}
public final static String META_PROP = "__MODIFIED";
@Override
public byte[] serialize(Object data) throws ZkMarshallingError
{
if (data instanceof Properties) {
final Properties props = (Properties) data;
ByteArrayOutputStream bos = new ByteArrayOutputStream(props.size()*60 + 30);
try {
final String ts = df.format(new Date());
props.setProperty("__MODIFIED", ts);
props.store(bos, "Druid");
} catch (IOException ignored) { }
return bos.toByteArray();
}
throw new IAE("Can only serialize Properties into ZK");
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError
{
final Properties props = new Properties();
try {
props.load(new ByteArrayInputStream(bytes));
} catch (IOException ignored) {
}
return props;
}
}

71
druid-services/pom.xml Normal file
View File

@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.1.0-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-realtime</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,6 +1,6 @@
# Properties for demo of Realtime Node in standalone mode. # Properties for demo of Realtime Node in standalone mode.
# To Use This: copy this file to runtime.properties and put directory containing it in classpath.
# #
comment.origin=druid/examples/rand/src/main/resources/runtime.properties
# S3 access # S3 access
com.metamx.aws.accessKey=<S3 access key> com.metamx.aws.accessKey=<S3 access key>
@ -15,7 +15,10 @@ druid.database.user=user
druid.database.password=password druid.database.password=password
# time between polling for metadata database # time between polling for metadata database
druid.database.poll.duration=PT1M druid.database.poll.duration=PT1M
# table for segment metadata coordination, no default
druid.database.segmentTable=prod_segments druid.database.segmentTable=prod_segments
#in progress 20121010 #druid.database.taskTable= #in progress 20121010 #druid.database.taskTable=
druid.emitter.period=PT60S druid.emitter.period=PT60S
@ -42,23 +45,52 @@ druid.server.maxSize=300000000000
# =realtime or =historical (default) # =realtime or =historical (default)
druid.server.type=realtime druid.server.type=realtime
# ZK path for service discovery within the cluster #
druid.zk.paths.announcementsPath=/druid/announcementsPath # zookeeper (zk) znode paths (zpaths)
#
# Legacy path, must be set, but can be ignored # base znode which establishes a unique namespace for a Druid ensemble.
druid.zk.paths.indexesPath=/druid/indexesPath # Default is /druid if not set
# This can also be set via parameter baseZkPath of the DruidSetup commandline
# druid.zk.paths.base=
druid.zk.paths.indexer.tasksPath=/druid/tasksPath # If these zpath properties like druid.zk.paths.*Path are overridden, then all must be
druid.zk.paths.indexer.statusPath=/druid/statusPath # overridden together for upgrade safety reasons.
# The commandline utility DruidSetup, which is used to set up properties on zookeeper,
# will validate this. Also, these zpaths must start with / because they are not relative.
# ZK znode path for service discovery within the cluster.
# Default is value of druid.zk.paths.base + /announcements
# druid.zk.paths.announcementsPath=/druid/announcements
# Legacy znode path, must be set, but can be ignored
#druid.zk.paths.indexesPath=/druid/indexes
# Default is value of druid.zk.paths.base + /tasks
##druid.zk.paths.indexer.tasksPath=/druid/tasks
# Default is value of druid.zk.paths.base + /status
#druid.zk.paths.indexer.statusPath=/druid/status
# ZK path for load/drop protocol between Master/Compute # ZK path for load/drop protocol between Master/Compute
druid.zk.paths.loadQueuePath=/druid/loadQueuePath # Default is value of druid.zk.paths.base + /loadQueue
#druid.zk.paths.loadQueuePath=/druid/loadQueue
# ZK path for Master leadership election # ZK path for Master leadership election
druid.zk.paths.masterPath=/druid/masterPath # Default is value of druid.zk.paths.base + /master
#druid.zk.paths.masterPath=/druid/master
# ZK path for publishing served segments # ZK path for publishing served segments
druid.zk.paths.servedSegmentsPath=/druid/servedSegmentsPath # Default is value of druid.zk.paths.base + /servedSegments
#druid.zk.paths.servedSegmentsPath=/druid/servedSegments
# Default is value of druid.zk.paths.base + /leaderLatch
#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch
# ZK path for properties stored in zookeeper
# Default is value of druid.zk.paths.base + /properties
#druid.zk.paths.propertiesPath=/druid/properties
druid.host=127.0.0.1 druid.host=127.0.0.1
druid.port=8080 druid.port=8080
@ -72,8 +104,10 @@ com.metamx.emitter.logging=true
com.metamx.emitter.logging.level=info com.metamx.emitter.logging.level=info
com.metamx.metrics.emitter.period=PT60S com.metamx.metrics.emitter.period=PT60S
# ZK quorum IPs; if empty, the use demo mode # ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]]
druid.zk.service.host= # if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples)
druid.zk.service.host=none
# msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks # msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks
druid.zk.service.connectionTimeout=1000000 druid.zk.service.connectionTimeout=1000000
@ -85,7 +119,7 @@ druid.processing.numThreads=3
# other properties found # other properties found
# #
druid.computation.buffer.size=10000000 druid.computation.buffer.size=10000000
druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatchPath
druid.merger.threads=1 druid.merger.threads=1
druid.merger.runner=remote druid.merger.runner=remote
druid.merger.whitelist.enabled=false druid.merger.whitelist.enabled=false

View File

@ -32,7 +32,7 @@ public class RealtimeStandaloneMain
{ {
LogLevelAdjuster.register(); LogLevelAdjuster.register();
Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
RealtimeNode rn = RealtimeNode.builder().build(); RealtimeNode rn = RealtimeNode.builder().build();
lifecycle.addManagedInstance(rn); lifecycle.addManagedInstance(rn);
@ -86,6 +86,20 @@ public class RealtimeStandaloneMain
} }
); );
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
);
try { try {
lifecycle.start(); lifecycle.start();
} }

View File

@ -1,6 +1,7 @@
# Properties for demo of Realtime Node in standalone mode. # Properties for demo of Realtime Node in standalone mode.
# To Use This: copy this file to runtime.properties and put directory containing it in classpath. # To Use This: copy this file to runtime.properties and put directory containing it in classpath.
# #
comment.origin=druid/examples/twitter/src/main/resources/runtime.properties
# S3 access # S3 access
com.metamx.aws.accessKey=<S3 access key> com.metamx.aws.accessKey=<S3 access key>
@ -15,7 +16,10 @@ druid.database.user=user
druid.database.password=password druid.database.password=password
# time between polling for metadata database # time between polling for metadata database
druid.database.poll.duration=PT1M druid.database.poll.duration=PT1M
# table for segment metadata coordination, no default
druid.database.segmentTable=prod_segments druid.database.segmentTable=prod_segments
#in progress 20121010 #druid.database.taskTable= #in progress 20121010 #druid.database.taskTable=
druid.emitter.period=PT60S druid.emitter.period=PT60S
@ -47,23 +51,51 @@ druid.server.maxSize=300000000000
# =realtime or =historical (default) # =realtime or =historical (default)
druid.server.type=realtime druid.server.type=realtime
# ZK path for service discovery within the cluster #
druid.zk.paths.announcementsPath=/druid/announcementsPath # zookeeper (zk) znode paths (zpaths)
#
# Legacy path, must be set, but can be ignored # base znode which establishes a unique namespace for a Druid ensemble.
druid.zk.paths.indexesPath=/druid/indexesPath # Default is /druid if not set
# This can also be set via parameter baseZkPath of the DruidSetup commandline
# druid.zk.paths.base=
druid.zk.paths.indexer.tasksPath=/druid/tasksPath # If these zpath properties like druid.zk.paths.*Path are overridden, then all must be
druid.zk.paths.indexer.statusPath=/druid/statusPath # overridden together for upgrade safety reasons.
# The commandline utility DruidSetup, which is used to set up properties on zookeeper,
# will validate this. Also, these zpaths must start with / because they are not relative.
# ZK znode path for service discovery within the cluster.
# Default is value of druid.zk.paths.base + /announcements
# druid.zk.paths.announcementsPath=/druid/announcements
# Legacy znode path, must be set, but can be ignored
#druid.zk.paths.indexesPath=/druid/indexes
# Default is value of druid.zk.paths.base + /tasks
##druid.zk.paths.indexer.tasksPath=/druid/tasks
# Default is value of druid.zk.paths.base + /status
#druid.zk.paths.indexer.statusPath=/druid/status
# ZK path for load/drop protocol between Master/Compute # ZK path for load/drop protocol between Master/Compute
druid.zk.paths.loadQueuePath=/druid/loadQueuePath # Default is value of druid.zk.paths.base + /loadQueue
#druid.zk.paths.loadQueuePath=/druid/loadQueue
# ZK path for Master leadership election # ZK path for Master leadership election
druid.zk.paths.masterPath=/druid/masterPath # Default is value of druid.zk.paths.base + /master
#druid.zk.paths.masterPath=/druid/master
# ZK path for publishing served segments # ZK path for publishing served segments
druid.zk.paths.servedSegmentsPath=/druid/servedSegmentsPath # Default is value of druid.zk.paths.base + /servedSegments
#druid.zk.paths.servedSegmentsPath=/druid/servedSegments
# Default is value of druid.zk.paths.base + /leaderLatch
#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch
# ZK path for properties stored in zookeeper
# Default is value of druid.zk.paths.base + /properties
#druid.zk.paths.propertiesPath=/druid/properties
druid.host=127.0.0.1 druid.host=127.0.0.1
druid.port=8080 druid.port=8080
@ -79,8 +111,10 @@ com.metamx.emitter.http=true
# unknown # com.metamx.emitter.logging.level=info # unknown # com.metamx.emitter.logging.level=info
# unknown # com.metamx.metrics.emitter.period=PT60S # unknown # com.metamx.metrics.emitter.period=PT60S
# ZK quorum IPs; if empty, the use demo mode # ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]]
druid.zk.service.host= # if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples)
druid.zk.service.host=none
# msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks # msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks
druid.zk.service.connectionTimeout=1000000 druid.zk.service.connectionTimeout=1000000
@ -92,7 +126,6 @@ druid.processing.numThreads=3
# other properties found # other properties found
# #
druid.computation.buffer.size=10000000 druid.computation.buffer.size=10000000
druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatchPath
druid.merger.threads=1 druid.merger.threads=1
druid.merger.runner=remote druid.merger.runner=remote
druid.merger.whitelist.enabled=false druid.merger.whitelist.enabled=false

View File

@ -3,7 +3,7 @@
"aggregators":[ "aggregators":[
{"type":"count", "name":"tweets"}, {"type":"count", "name":"tweets"},
{"type":"doubleSum","fieldName":"follower_count","name":"total_follower_count"}, {"type":"doubleSum","fieldName":"follower_count","name":"total_follower_count"},
{"type":"doubleSum","fieldName":"retweet_count","name":"tota_retweet_count"}, {"type":"doubleSum","fieldName":"retweet_count","name":"total_retweet_count"},
{"type":"doubleSum","fieldName":"friends_count","name":"total_friends_count"}, {"type":"doubleSum","fieldName":"friends_count","name":"total_friends_count"},
{"type":"doubleSum","fieldName":"statuses_count","name":"total_statuses_count"}, {"type":"doubleSum","fieldName":"statuses_count","name":"total_statuses_count"},

25
install/druid_setup.sh Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Script to run util DruidSetup which will initialize zookeeper locations, properties, and metadata store (MySQL or similar).
# The dump cmd of DruidSetup will dump properties stored at and zpaths of zookeeper.
# Run with no args to get usage.
which java >/dev/null
WJ=$?
if [ "${JAVA_HOME}" ]; then
RUN_JAVA=$JAVA_HOME/bin/java
elif [ $WJ -eq 0 ]; then
RUN_JAVA=java
fi
[ -z "${RUN_JAVA}" ] && echo "env var JAVA_HOME is not defined and java not in path" && exit 1
DRUID_DIR=$(cd $(dirname $0)/.. ; pwd)
DRUID_SERVER_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-server*selfcontained.jar') |head -1)"
[ -z "${DRUID_SERVER_JAR}" ] && echo "unable to find druid server jar" && exit 2
echo "using ${DRUID_SERVER_JAR}"
echo
$RUN_JAVA -cp "${DRUID_SERVER_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Djava.net.preferIPv4Stack=true -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $*
[ -e ${DRUID_DIR}/install/druid_setup.log ] && egrep "WARN|ERROR|FATAL" ${DRUID_DIR}/install/druid_setup.log

18
install/log4j.xml Normal file
View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="FileAppender" class="org.apache.log4j.FileAppender">
<param name="File" value="druid_setup.log" />
<param name="Append" value="false" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d{ISO8601} %p [%t] %c - %m%n"/>
</layout>
</appender>
<root>
<priority value ="warn" />
<appender-ref ref="FileAppender"/>
</root>
</log4j:configuration>

View File

@ -50,6 +50,7 @@
<module>merger</module> <module>merger</module>
<module>realtime</module> <module>realtime</module>
<module>examples</module> <module>examples</module>
<module>druid-services</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>

View File

@ -0,0 +1,412 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.utils;
import com.google.common.io.Closeables;
import com.metamx.common.logger.Logger;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.zk.PropertiesZkSerializer;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import java.io.*;
import java.util.Properties;
/**
* Set up the shared Druid ensemble space.
* This affects the Zookeeper which holds common properties, and znode paths for coordination,
* and also performs metadata table creation in the database (MySQL).
* By storing ensemble-wide properties in zookeeper, cluster administration is simplified.
* Each service instance can also have local property overrides in the file runtime.properties
* located in the classpath.
* <p>
* The design rules are noted here with rationale
* </p>
* <p/>
* <pre>
* Design Rule Notes:
* (a) Properties set on the commandline of services take precedence over runtime.properties which
* takes precedence over properties stored in zookeeper.
*
* Rationale: organizing principle.
*
* (a) Services load properties on startup only.
*
* Rationale: stepwise changes are safer and easier to manage.
*
* (b) Only DruidSetup creates properties and znode paths (zpaths) on zookeeper and no other tool or service
* will make ensemble-wide settings automatically.
*
* Rationale: one place for this logic, under manual control, and avoid accidental
* namespace/partition creation.
*
* (c) DruidSetup creates reasonable zpaths but supports overrides to enable tactical
* version transitions (just in case). If zpaths are overridden, then they must all be
* overridden together since they are not independent.
*
* Rationale: convention beats configuration most of the time; sometimes configuration is needed
* negotiate unusual cases.
*
* (d) Properties settings stored on zookeeper are not cumulative; previous properties are removed before
* new ones are stored.
* Rationale: Keep the operations at the granularity of a file of properties, avoid
* dependence on order of setup operations, enable dumping of current settings.
* </pre>
*
* @author pbaclace
*/
public class DruidSetup
{
private static final Logger log = new Logger(DruidSetup.class);
public static void main(final String[] args)
{
ZkClient zkClient = null;
if (args.length < 2 || args.length > 3) {
printUsage();
System.exit(1);
}
String cmd = args[0];
if ("dump".equals(cmd) && args.length == 3) {
final String zkConnect = args[1];
zkClient = connectToZK(zkConnect);
String zpathBase = args[2];
dumpFromZk(zkClient, zpathBase, zkConnect, System.out);
} else if ("put".equals(cmd) && args.length == 3) {
final String zkConnect = args[1];
zkClient = connectToZK(zkConnect);
final String pfile = args[2];
putToZk(zkClient, pfile);
} else if ("dbprep".equals(cmd) && args.length == 2) {
final String pfile = args[1];
prepDB(pfile);
} else {
printUsage();
System.exit(1);
}
if (zkClient != null) {
zkClient.close();
}
}
/**
* Load properties from local file, validate and tweak.
* <p/>
* This can only be used for setup, not service run time because of some assembly here.
*
* @param pfile path to runtime.properties file to be read.
* @param props Properties object to fill, props like druid.zk.paths.*Path will always be set after
* this method either because the input file has them set (overrides) or because prop
* druid.zk.paths.base was used as a prefix to construct the default zpaths;
* druid.zk.paths.base will be set iff there is a single base for all zpaths
*/
private static void loadProperties(String pfile, Properties props)
{
InputStream is = null;
try {
is = new FileInputStream(pfile);
} catch (FileNotFoundException e) {
System.err.println("File not found: " + pfile);
System.err.println("No changes made.");
System.exit(4);
} catch (IOException ioe) {
reportErrorAndExit(pfile, ioe);
}
try {
props.load(is);
} catch (IOException e) {
reportErrorAndExit(pfile, e);
} finally {
Closeables.closeQuietly(is);
}
if (! Initialization.validateResolveProps(props)) { // bail, errors have been emitted
System.exit(9);
}
// emit effective zpaths to be used
System.out.println("Effective zpath properties:");
for (String pname : Initialization.SUB_PATH_PROPS) {
System.out.println(" " + pname + "=" + props.getProperty(pname));
}
System.out.println(" " + "druid.zk.paths.propertiesPath" + "=" +
props.getProperty("druid.zk.paths.propertiesPath"));
}
/**
* @param zkClient zookeeper client.
* @param zpathBase znode base path.
* @param zkConnect ZK coordinates in the form host1:port1[,host2:port2[, ...]]
* @param out
*/
private static void dumpFromZk(ZkClient zkClient, String zpathBase, String zkConnect, PrintStream out)
{
final String propPath = Initialization.makePropPath(zpathBase);
if (zkClient.exists(propPath)) {
Properties currProps = zkClient.readData(propPath, true);
if (currProps != null) {
out.println("# Begin Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath);
try {
currProps.store(out, "Druid");
} catch (IOException ignored) {
}
out.println("# End Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath);
out.println("# NOTE: properties like druid.zk.paths.*Path are always stored in zookeeper in absolute form.");
out.println();
}
}
//out.println("Zookeeper znodes and zpaths for " + zkConnect + " (showing all zpaths)");
// list all znodes
// (not ideal since recursive listing starts at / instead of at baseZkPath)
//zkClient.showFolders(out);
}
/**
* @param zkClient zookeeper client.
* @param pfile
*/
private static void putToZk(ZkClient zkClient, String pfile)
{
Properties props = new Properties();
loadProperties(pfile, props);
String zpathBase = props.getProperty("druid.zk.paths.base");
// create znodes first
//
createZNodes(zkClient, zpathBase, System.out);
// put props
//
updatePropertiesZK(zkClient, zpathBase, props, System.out);
}
/**
* @param zkClient zookeeper client.
* @param zpathBase znode base path.
* @param props the properties to store.
* @param out the PrintStream for human readable update summary (usually System.out).
*/
private static void updatePropertiesZK(ZkClient zkClient, String zpathBase, Properties props, PrintStream out)
{
final String propPathOverride = props.getProperty("druid.zk.paths.propertiesPath");
final String propPathConstructed = Initialization.makePropPath(zpathBase);
final String propPath = (propPathOverride != null) ? propPathOverride : propPathConstructed;
Properties currProps = null;
if (zkClient.exists(propPath)) {
currProps = zkClient.readData(propPath, true);
}
boolean propsDiffer = false;
if (currProps == null) {
out.println("No properties currently stored in zk");
propsDiffer = true;
} else { // determine whether anything is different
int countNew = 0;
int countDiffer = 0;
int countRemoved = 0;
int countNoChange = 0;
String currMetaPropVal = "";
StringBuilder changes = new StringBuilder(1024);
for (String pname : props.stringPropertyNames()) {
if (pname.equals(PropertiesZkSerializer.META_PROP)) continue; // ignore meta prop datestamp, if any
final String pvalue = props.getProperty(pname);
final String pvalueCurr = currProps.getProperty(pname);
if (pvalueCurr == null) {
countNew++;
} else {
if (pvalueCurr.equals(pvalue)) {
countNoChange++;
} else {
countDiffer++;
changes.append("CHANGED: ").append(pname).append("= PREV=").append(pvalueCurr)
.append(" NOW=").append(pvalue).append("\n");
}
}
}
for (String pname : currProps.stringPropertyNames()) {
if (pname.equals(PropertiesZkSerializer.META_PROP)) {
currMetaPropVal = currProps.getProperty(pname);
continue; // ignore meta prop datestamp
}
if (props.getProperty(pname) == null) {
countRemoved++;
changes.append("REMOVED: ").append(pname).append("=").append(currProps.getProperty(pname)).append("\n");
}
}
if (countNew + countRemoved + countDiffer > 0) {
out.println("Current properties differ: "
+ countNew + " new, "
+ countDiffer + " different values, "
+ countRemoved + " removed, "
+ countNoChange + " unchanged, "
+ currMetaPropVal + " previously updated"
);
out.println(changes);
propsDiffer = true;
} else {
out.println("Current properties identical to file given, entry count=" + countNoChange);
}
}
if (propsDiffer) {
if (currProps != null) {
zkClient.delete(propPath);
}
// update zookeeper
zkClient.createPersistent(propPath, props);
out.println("Properties updated, entry count=" + props.size());
}
}
/**
* @param zkClient zookeeper client.
* @param zpathBase znode base path.
* @param out the PrintStream for human readable update summary.
*/
private static void createZNodes(ZkClient zkClient, String zpathBase, PrintStream out)
{
zkClient.createPersistent(zpathBase, true);
for (String subPath : Initialization.SUB_PATHS) {
final String thePath = String.format("%s/%s", zpathBase, subPath);
if (zkClient.exists(thePath)) {
out.printf("Path[%s] exists already%n", thePath);
} else {
out.printf("Creating ZK path[%s]%n", thePath);
zkClient.createPersistent(thePath, true);
}
}
}
private static void reportErrorAndExit(String pfile, IOException ioe)
{
System.err.println("Could not read file: " + pfile);
System.err.println(" because of: " + ioe);
System.err.println("No changes made.");
System.exit(4);
}
private static ZkClient connectToZK(String zkConnect)
{
return new ZkClient(
new ZkConnection(zkConnect),
Integer.MAX_VALUE,
new PropertiesZkSerializer()
);
}
/**
* Connect to db and create table, if it does not exist.
* NOTE: Connection failure only shows up in log output.
*
* @param pfile path to properties file to use.
*/
private static void prepDB(final String pfile)
{
Properties tmp_props = new Properties();
loadProperties(pfile, tmp_props);
final String tableName = tmp_props.getProperty("druid.database.segmentTable", "prod_segments");
final String dbConnectionUrl = tmp_props.getProperty("druid.database.connectURI");
final String username = tmp_props.getProperty("druid.database.user");
final String password = tmp_props.getProperty("druid.database.password");
//
// validation
//
if (tableName.length() == 0 || !Character.isLetter(tableName.charAt(0))) {
throw new RuntimeException("poorly formed property druid.database.segmentTable=" + tableName);
}
if (username == null || username.length() == 0) {
throw new RuntimeException("poorly formed property druid.database.user=" + username);
}
if (password == null || password.length() == 0) {
throw new RuntimeException("poorly formed property druid.database.password=" + password);
}
if (dbConnectionUrl == null || dbConnectionUrl.length() == 0) {
throw new RuntimeException("poorly formed property druid.database.connectURI=" + dbConnectionUrl);
}
final DbConnectorConfig config = new DbConnectorConfig()
{
{
}
@Override
public String getDatabaseConnectURI()
{
return dbConnectionUrl;
}
@Override
public String getDatabaseUser()
{
return username;
}
@Override
public String getDatabasePassword()
{
return password;
}
@Override
public String getSegmentTable()
{
return tableName;
}
};
DbConnector dbConnector = new DbConnector(config);
DbConnector.createSegmentTable(dbConnector.getDBI(), config);
}
/**
* Print usage to stdout.
*/
private static void printUsage()
{
System.out.println("Usage: <java invocation> CMD [args]\n"
+ " Where CMD is a particular command:\n"
+ " CMD choices:\n"
+ " dump zkConnect baseZkPath # dump info from zk at given coordinates\n"
+ " dbprep propfile # create metadata table in db\n"
+ " put zkConnect propfile # store paths and propfile into zk at given coordinates\n"
+ " args:\n"
+ " zkConnect: ZK coordinates in the form host1:port1[,host2:port2[, ...]]\n"
+ " baseZkPath: like /druid or /mydruid etc. to uniquely identify a Druid ensemble\n"
+ " and should be equal to property druid.zk.paths.base\n"
+ " propfile: Java properties file with common properties for all services in ensemble\n"
+ " Notes:\n"
+ " dump command makes no modifications and shows zk properties at baseZkPath.\n"
+ " put command can safely be invoked more than once, will not disturb existing queues,\n"
+ " and properties are not cumulative.\n"
+ " A zookeeper can service more than one Druid ensemble if baseZkPath is distinct.\n"
+ " Druid services only load properties during process startup.\n"
+ " Properties defined on a service command line take precedence over the runtime.properties\n"
+ " file which takes precedence over properties stored in zookeeper.\n"
+ ""
);
}
}

View File

@ -21,11 +21,13 @@ package com.metamx.druid.utils;
import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.zk.StringZkSerializer; import com.metamx.druid.zk.StringZkSerializer;
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.ZkConnection;
/** /**
* @deprecated see DruidSetup
*/ */
public class ZkSetup public class ZkSetup
{ {
@ -33,13 +35,12 @@ public class ZkSetup
{ {
if (args.length != 5) { if (args.length != 5) {
System.out.println("Usage: <java invocation> zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName"); System.out.println("Usage: <java invocation> zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName");
System.out.println("This utility is deprecated, see DruidSetup instead.");
System.exit(1); System.exit(1);
} }
String path = args[1]; String path = args[1];
String[] subPaths = new String[]{"announcements", "servedSegments", "loadQueue", "master"};
final ZkClient zkClient = new ZkClient( final ZkClient zkClient = new ZkClient(
new ZkConnection(args[0]), new ZkConnection(args[0]),
Integer.MAX_VALUE, Integer.MAX_VALUE,
@ -47,7 +48,7 @@ public class ZkSetup
); );
zkClient.createPersistent(path, true); zkClient.createPersistent(path, true);
for (String subPath : subPaths) { for (String subPath : Initialization.SUB_PATHS) {
final String thePath = String.format("%s/%s", path, subPath); final String thePath = String.format("%s/%s", path, subPath);
if (zkClient.exists(thePath)) { if (zkClient.exists(thePath)) {
System.out.printf("Path[%s] exists already%n", thePath); System.out.printf("Path[%s] exists already%n", thePath);