diff --git a/README b/README index e39398352b4..cb1ec84a8ab 100644 --- a/README +++ b/README @@ -1 +1,6 @@ -See the "Wiki":https://github.com/metamx/druid/wiki \ No newline at end of file +See the "Wiki" https://github.com/metamx/druid/wiki + +Build with build.sh + +See examples/rand +See examples/twitter diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 8724c287b22..addf3ac6ebb 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -31,6 +31,7 @@ import com.metamx.druid.http.FileRequestLogger; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.utils.PropUtils; import com.metamx.druid.zk.StringZkSerializer; +import com.metamx.druid.zk.PropertiesZkSerializer; import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFrameworkFactory; import com.netflix.curator.retry.ExponentialBackoffRetry; @@ -59,7 +60,17 @@ public class Initialization { private static final Logger log = new Logger(Initialization.class); - private static volatile Properties props = null; + private static final Properties zkProps = new Properties(); + private static final Properties fileProps = new Properties(zkProps); + private static Properties props = null; + public final static String PROP_SUBPATH = "properties"; + public final static String[] SUB_PATHS = {"announcements", "servedSegments", "loadQueue", "master"}; + public final static String[] SUB_PATH_PROPS = { + "druid.zk.paths.announcementsPath", + "druid.zk.paths.servedSegmentsPath", + "druid.zk.paths.loadQueuePath", + "druid.zk.paths.masterPath"}; + public static final String DEFAULT_ZPATH = "/druid"; public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle) { @@ -107,23 +118,33 @@ public class Initialization ); } - public static Properties loadProperties() + + /** Load properties. + * Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper. + * Idempotent. Thread-safe. Properties are only loaded once. + * If property druid.zk.service.host=none then do not load properties from zookeeper. + * @return Properties ready to use. + */ + public synchronized static Properties loadProperties() { if (props != null) { return props; } - Properties loadedProps = null; + // Note that zookeeper coordinates must be either in cmdLine or in runtime.properties + Properties sp = System.getProperties(); + + Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain + tmp_props.putAll(sp); + final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties"); if (stream == null) { - log.info("runtime.properties didn't exist as a resource, loading system properties instead."); - loadedProps = System.getProperties(); + log.info("runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."); } else { - log.info("Loading properties from runtime.properties."); + log.info("Loading properties from runtime.properties"); try { - loadedProps = new Properties(); try { - loadedProps.load(stream); + fileProps.load(stream); } catch (IOException e) { throw Throwables.propagate(e); @@ -134,13 +155,60 @@ public class Initialization } } - for (String prop : loadedProps.stringPropertyNames()) { - log.info("Loaded Property[%s] as [%s]", prop, loadedProps.getProperty(prop)); + // log properties from file; note stringPropertyNames() will follow Properties.defaults but + // next level is empty at this point. + for (String prop : fileProps.stringPropertyNames()) { + log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop)); } - props = loadedProps; + final String zk_hosts = tmp_props.getProperty("druid.zk.service.host"); - return loadedProps; + if (zk_hosts != null) { + if (!zk_hosts.equals("none")) { // get props from zk + final ZkClient zkPropLoadingClient; + final ZkClientConfig clientConfig = new ZkClientConfig() + { + @Override + public String getZkHosts() + { + return zk_hosts; + } + }; + + zkPropLoadingClient = new ZkClient( + new ZkConnection(clientConfig.getZkHosts()), + clientConfig.getConnectionTimeout(), + new PropertiesZkSerializer() + ); + zkPropLoadingClient.waitUntilConnected(); + String propertiesZNodePath = tmp_props.getProperty("druid.zk.paths.propertiesPath"); + if (propertiesZNodePath == null) { + String zpathBase = tmp_props.getProperty("druid.zk.paths.base", DEFAULT_ZPATH); + propertiesZNodePath = makePropPath(zpathBase); + } + // get properties stored by zookeeper (lowest precedence) + if (zkPropLoadingClient.exists(propertiesZNodePath)) { + Properties p = zkPropLoadingClient.readData(propertiesZNodePath, true); + if (p != null) { + zkProps.putAll(p); + } + } + // log properties from zk + for (String prop : zkProps.stringPropertyNames()) { + log.info("Loaded(properties stored in zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop)); + } + } // get props from zk + } else { // ToDo: should this be an error? + log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination."); + } + // validate properties now that all levels of precedence are loaded + if (! validateResolveProps(tmp_props)) { + log.error("Properties failed to validate, cannot continue"); + throw new RuntimeException("Properties failed to validate"); + } + props = tmp_props; // publish + + return props; } public static Server makeJettyServer(ServerConfig config) @@ -284,4 +352,116 @@ public class Initialization new File(PropUtils.getProperty(props, "druid.request.logging.dir")) ); } + + public static String makePropPath(String basePath) + { + return String.format("%s/%s", basePath, PROP_SUBPATH); + } + + /** Validate and Resolve Properties. + * Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value. + * Check validity so that if druid.zk.paths.*Path props are set, all are set, + * if none set, then construct defaults relative to druid.zk.paths.base and add these + * to the properties chain. + * @param props + * @return true if valid zpath properties. + */ + public static boolean validateResolveProps(Properties props) + { + boolean zpathValidateFailed;// validate druid.zk.paths.base + String propertyZpath = props.getProperty("druid.zk.paths.base"); + zpathValidateFailed = zpathBaseCheck(propertyZpath, "property druid.zk.paths.base"); + + String zpathEffective = DEFAULT_ZPATH; + if (propertyZpath != null) { + zpathEffective = propertyZpath; + } + + final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath"); + + if (!zpathValidateFailed) System.out.println("Effective zpath prefix=" + zpathEffective); + + // validate druid.zk.paths.*Path properties + // + // if any zpath overrides are set in properties, all must be set, and they must start with / + int zpathOverrideCount = 0; + boolean zpathOverridesNotAbs = false; + StringBuilder sbErrors = new StringBuilder(100); + for (int i = 0; i < SUB_PATH_PROPS.length; i++) { + String val = props.getProperty(SUB_PATH_PROPS[i]); + if (val != null) { + zpathOverrideCount++; + if (!val.startsWith("/")) { + zpathOverridesNotAbs = true; + sbErrors.append(SUB_PATH_PROPS[i]).append("=").append(val).append("\n"); + zpathValidateFailed = true; + } + } + } + // separately check druid.zk.paths.propertiesPath (not in SUB_PATH_PROPS since it is not a "dir") + if (propertiesZpathOverride != null) { + zpathOverrideCount++; + if (!propertiesZpathOverride.startsWith("/")) { + zpathOverridesNotAbs = true; + sbErrors.append("druid.zk.paths.propertiesPath").append("=").append(propertiesZpathOverride).append("\n"); + zpathValidateFailed = true; + } + } + if (zpathOverridesNotAbs) { + System.err.println("When overriding zk zpaths, with properties like druid.zk.paths.*Path " + + "the znode path must start with '/' (slash) ; problem overrides:"); + System.err.print(sbErrors.toString()); + } + if (zpathOverrideCount > 0) { + if (zpathOverrideCount < SUB_PATH_PROPS.length + 1) { + zpathValidateFailed = true; + System.err.println("When overriding zk zpaths, with properties of form druid.zk.paths.*Path " + + "all must be overridden together; missing overrides:"); + for (int i = 0; i < SUB_PATH_PROPS.length; i++) { + String val = props.getProperty(SUB_PATH_PROPS[i]); + if (val == null) { + System.err.println(" " + SUB_PATH_PROPS[i]); + } + } + if (propertiesZpathOverride == null) System.err.println(" " + "druid.zk.paths.propertiesPath"); + } else { // proper overrides + // do not prefix with property druid.zk.paths.base + ; // fallthru + } + } else { // no overrides + if (propertyZpath == null) { // if default base is used, store it as documentation + props.setProperty("druid.zk.paths.base", zpathEffective); + } + // + // Resolve default zpaths using zpathEffective as base + // + for (int i = 0; i < SUB_PATH_PROPS.length; i++) { + props.setProperty(SUB_PATH_PROPS[i], zpathEffective + "/" + SUB_PATHS[i]); + } + props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties"); + } + return ! zpathValidateFailed; + } + + /** Check znode zpath base for proper slash, no trailing slash. + * @param zpathBase znode base path, if null then this method does nothing. + * @param errorMsgPrefix error context to use if errors are emitted, should indicate + * where the zpathBase value came from. + * @return true if validate failed. + */ + public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix) + { + boolean zpathValidateFailed = false; + if (zpathBase != null) { + if (!zpathBase.startsWith("/")) { + zpathValidateFailed = true; + System.err.println(errorMsgPrefix + " must start with '/' (slash); found=" + zpathBase); + } + if (zpathBase.endsWith("/")) { + zpathValidateFailed = true; + System.err.println(errorMsgPrefix + " must NOT end with '/' (slash); found=" + zpathBase); + } + } + return zpathValidateFailed; + } } diff --git a/client/src/main/java/com/metamx/druid/zk/PropertiesZkSerializer.java b/client/src/main/java/com/metamx/druid/zk/PropertiesZkSerializer.java new file mode 100644 index 00000000000..036a8dc5055 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/zk/PropertiesZkSerializer.java @@ -0,0 +1,69 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.zk; + +import com.metamx.common.IAE; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import java.io.*; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; +import java.util.TimeZone; + +/** +*/ +public class PropertiesZkSerializer implements ZkSerializer +{ + private static final SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss'z'"); + static { + df.setTimeZone(TimeZone.getTimeZone("UTC")); + } + public final static String META_PROP = "__MODIFIED"; + + @Override + public byte[] serialize(Object data) throws ZkMarshallingError + { + if (data instanceof Properties) { + final Properties props = (Properties) data; + ByteArrayOutputStream bos = new ByteArrayOutputStream(props.size()*60 + 30); + try { + final String ts = df.format(new Date()); + props.setProperty("__MODIFIED", ts); + props.store(bos, "Druid"); + } catch (IOException ignored) { } + return bos.toByteArray(); + } + throw new IAE("Can only serialize Properties into ZK"); + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError + { + final Properties props = new Properties(); + try { + props.load(new ByteArrayInputStream(bytes)); + } catch (IOException ignored) { + } + return props; + } +} diff --git a/common/src/main/java/com/metamx/druid/aggregation/Histogram.java b/common/src/main/java/com/metamx/druid/aggregation/Histogram.java index 8f081ca7ade..bb27e317eb4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/Histogram.java +++ b/common/src/main/java/com/metamx/druid/aggregation/Histogram.java @@ -132,10 +132,15 @@ public class Histogram return buf.array(); } + /** + * Returns a visual representation of a histogram object. + * Initially returns an array of just the min. and max. values + * but can also support the addition of quantiles. + */ public HistogramVisual asVisual() { float[] visualCounts = new float[bins.length - 2]; for(int i = 0; i < visualCounts.length; ++i) visualCounts[i] = (float)bins[i + 1]; - return new HistogramVisual(breaks, visualCounts, min, max); + return new HistogramVisual(breaks, visualCounts, new float[]{min, max}); } public static Histogram fromBytes(byte[] bytes) { diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramVisual.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramVisual.java index 5ce5217e1b2..ab4e8836fe1 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramVisual.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramVisual.java @@ -29,15 +29,14 @@ public class HistogramVisual { @JsonProperty final public double[] breaks; @JsonProperty final public double[] counts; - @JsonProperty final double min; - @JsonProperty final double max; + // an array of the quantiles including the min. and max. + @JsonProperty final public double[] quantiles; @JsonCreator public HistogramVisual( @JsonProperty double[] breaks, @JsonProperty double[] counts, - @JsonProperty double min, - @JsonProperty double max + @JsonProperty double[] quantiles ) { Preconditions.checkArgument(breaks != null, "breaks must not be null"); @@ -46,15 +45,13 @@ public class HistogramVisual this.breaks = breaks; this.counts = counts; - this.min = min; - this.max = max; + this.quantiles = quantiles; } public HistogramVisual( float[] breaks, float[] counts, - float min, - float max + float[] quantiles ) { Preconditions.checkArgument(breaks != null, "breaks must not be null"); @@ -63,10 +60,10 @@ public class HistogramVisual this.breaks = new double[breaks.length]; this.counts = new double[counts.length]; + this.quantiles = new double[quantiles.length]; for(int i = 0; i < breaks.length; ++i) this.breaks[i] = breaks[i]; for(int i = 0; i < counts.length; ++i) this.counts[i] = counts[i]; - this.min = min; - this.max = max; + for(int i = 0; i < quantiles.length; ++i) this.quantiles[i] = quantiles[i]; } @Override @@ -75,8 +72,7 @@ public class HistogramVisual return "HistogramVisual{" + "counts=" + Arrays.toString(counts) + ", breaks=" + Arrays.toString(breaks) + - ", min=" + min + - ", max=" + max + + ", quantiles=" + Arrays.toString(quantiles) + '}'; } } diff --git a/common/src/test/java/com/metamx/druid/histogram/HistogramTest.java b/common/src/test/java/com/metamx/druid/histogram/HistogramTest.java index e40cecaa203..c280507e207 100644 --- a/common/src/test/java/com/metamx/druid/histogram/HistogramTest.java +++ b/common/src/test/java/com/metamx/druid/histogram/HistogramTest.java @@ -68,8 +68,7 @@ public class HistogramTest Map expectedObj = Maps.newLinkedHashMap(); expectedObj.put("breaks", Arrays.asList(visualBreaks)); expectedObj.put("counts", Arrays.asList(visualCounts)); - expectedObj.put("min", -1.0); - expectedObj.put("max", 1.0); + expectedObj.put("quantiles", Arrays.asList(new Double[]{-1.0, 1.0})); Map obj = (Map)objectMapper.readValue(json, Object.class); Assert.assertEquals(expectedObj, obj); diff --git a/druid-services/pom.xml b/druid-services/pom.xml new file mode 100644 index 00000000000..75d476bf748 --- /dev/null +++ b/druid-services/pom.xml @@ -0,0 +1,71 @@ + + + + 4.0.0 + + com.metamx.druid + druid-services + druid-services + druid-services + 0.1.0-SNAPSHOT + + com.metamx + druid + 0.1.0-SNAPSHOT + + + + + com.metamx.druid + druid-realtime + ${project.parent.version} + + + com.metamx.druid + druid-server + ${project.parent.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.6 + + + package + + shade + + + + ${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar + + + + + + + + \ No newline at end of file diff --git a/examples/rand/src/main/resources/runtime.properties b/examples/rand/src/main/resources/runtime.properties index 3aafdda906a..03197ad6751 100644 --- a/examples/rand/src/main/resources/runtime.properties +++ b/examples/rand/src/main/resources/runtime.properties @@ -1,6 +1,6 @@ # Properties for demo of Realtime Node in standalone mode. -# To Use This: copy this file to runtime.properties and put directory containing it in classpath. # +comment.origin=druid/examples/rand/src/main/resources/runtime.properties # S3 access com.metamx.aws.accessKey= @@ -15,7 +15,10 @@ druid.database.user=user druid.database.password=password # time between polling for metadata database druid.database.poll.duration=PT1M + +# table for segment metadata coordination, no default druid.database.segmentTable=prod_segments + #in progress 20121010 #druid.database.taskTable= druid.emitter.period=PT60S @@ -42,23 +45,52 @@ druid.server.maxSize=300000000000 # =realtime or =historical (default) druid.server.type=realtime -# ZK path for service discovery within the cluster -druid.zk.paths.announcementsPath=/druid/announcementsPath +# +# zookeeper (zk) znode paths (zpaths) +# -# Legacy path, must be set, but can be ignored -druid.zk.paths.indexesPath=/druid/indexesPath +# base znode which establishes a unique namespace for a Druid ensemble. +# Default is /druid if not set +# This can also be set via parameter baseZkPath of the DruidSetup commandline +# druid.zk.paths.base= -druid.zk.paths.indexer.tasksPath=/druid/tasksPath -druid.zk.paths.indexer.statusPath=/druid/statusPath +# If these zpath properties like druid.zk.paths.*Path are overridden, then all must be +# overridden together for upgrade safety reasons. +# The commandline utility DruidSetup, which is used to set up properties on zookeeper, +# will validate this. Also, these zpaths must start with / because they are not relative. + +# ZK znode path for service discovery within the cluster. +# Default is value of druid.zk.paths.base + /announcements +# druid.zk.paths.announcementsPath=/druid/announcements + +# Legacy znode path, must be set, but can be ignored +#druid.zk.paths.indexesPath=/druid/indexes + +# Default is value of druid.zk.paths.base + /tasks +##druid.zk.paths.indexer.tasksPath=/druid/tasks + +# Default is value of druid.zk.paths.base + /status +#druid.zk.paths.indexer.statusPath=/druid/status # ZK path for load/drop protocol between Master/Compute -druid.zk.paths.loadQueuePath=/druid/loadQueuePath +# Default is value of druid.zk.paths.base + /loadQueue +#druid.zk.paths.loadQueuePath=/druid/loadQueue # ZK path for Master leadership election -druid.zk.paths.masterPath=/druid/masterPath +# Default is value of druid.zk.paths.base + /master +#druid.zk.paths.masterPath=/druid/master # ZK path for publishing served segments -druid.zk.paths.servedSegmentsPath=/druid/servedSegmentsPath +# Default is value of druid.zk.paths.base + /servedSegments +#druid.zk.paths.servedSegmentsPath=/druid/servedSegments + +# Default is value of druid.zk.paths.base + /leaderLatch +#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch + +# ZK path for properties stored in zookeeper +# Default is value of druid.zk.paths.base + /properties +#druid.zk.paths.propertiesPath=/druid/properties + druid.host=127.0.0.1 druid.port=8080 @@ -72,8 +104,10 @@ com.metamx.emitter.logging=true com.metamx.emitter.logging.level=info com.metamx.metrics.emitter.period=PT60S -# ZK quorum IPs; if empty, the use demo mode -druid.zk.service.host= +# ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]] +# if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples) +druid.zk.service.host=none + # msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks druid.zk.service.connectionTimeout=1000000 @@ -85,7 +119,7 @@ druid.processing.numThreads=3 # other properties found # druid.computation.buffer.size=10000000 -druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatchPath + druid.merger.threads=1 druid.merger.runner=remote druid.merger.whitelist.enabled=false diff --git a/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java b/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java index 6fabdf04452..32f355c756b 100644 --- a/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java +++ b/examples/twitter/src/main/java/druid/examples/RealtimeStandaloneMain.java @@ -32,7 +32,7 @@ public class RealtimeStandaloneMain { LogLevelAdjuster.register(); - Lifecycle lifecycle = new Lifecycle(); + final Lifecycle lifecycle = new Lifecycle(); RealtimeNode rn = RealtimeNode.builder().build(); lifecycle.addManagedInstance(rn); @@ -86,6 +86,20 @@ public class RealtimeStandaloneMain } ); + Runtime.getRuntime().addShutdownHook( + new Thread( + new Runnable() + { + @Override + public void run() + { + log.info("Running shutdown hook"); + lifecycle.stop(); + } + } + ) + ); + try { lifecycle.start(); } diff --git a/examples/twitter/src/main/resources/runtime.properties b/examples/twitter/src/main/resources/runtime.properties index 3f7fdeaec50..b504f2513de 100644 --- a/examples/twitter/src/main/resources/runtime.properties +++ b/examples/twitter/src/main/resources/runtime.properties @@ -1,6 +1,7 @@ # Properties for demo of Realtime Node in standalone mode. # To Use This: copy this file to runtime.properties and put directory containing it in classpath. # +comment.origin=druid/examples/twitter/src/main/resources/runtime.properties # S3 access com.metamx.aws.accessKey= @@ -15,7 +16,10 @@ druid.database.user=user druid.database.password=password # time between polling for metadata database druid.database.poll.duration=PT1M + +# table for segment metadata coordination, no default druid.database.segmentTable=prod_segments + #in progress 20121010 #druid.database.taskTable= druid.emitter.period=PT60S @@ -47,23 +51,51 @@ druid.server.maxSize=300000000000 # =realtime or =historical (default) druid.server.type=realtime -# ZK path for service discovery within the cluster -druid.zk.paths.announcementsPath=/druid/announcementsPath +# +# zookeeper (zk) znode paths (zpaths) +# -# Legacy path, must be set, but can be ignored -druid.zk.paths.indexesPath=/druid/indexesPath +# base znode which establishes a unique namespace for a Druid ensemble. +# Default is /druid if not set +# This can also be set via parameter baseZkPath of the DruidSetup commandline +# druid.zk.paths.base= -druid.zk.paths.indexer.tasksPath=/druid/tasksPath -druid.zk.paths.indexer.statusPath=/druid/statusPath +# If these zpath properties like druid.zk.paths.*Path are overridden, then all must be +# overridden together for upgrade safety reasons. +# The commandline utility DruidSetup, which is used to set up properties on zookeeper, +# will validate this. Also, these zpaths must start with / because they are not relative. + +# ZK znode path for service discovery within the cluster. +# Default is value of druid.zk.paths.base + /announcements +# druid.zk.paths.announcementsPath=/druid/announcements + +# Legacy znode path, must be set, but can be ignored +#druid.zk.paths.indexesPath=/druid/indexes + +# Default is value of druid.zk.paths.base + /tasks +##druid.zk.paths.indexer.tasksPath=/druid/tasks + +# Default is value of druid.zk.paths.base + /status +#druid.zk.paths.indexer.statusPath=/druid/status # ZK path for load/drop protocol between Master/Compute -druid.zk.paths.loadQueuePath=/druid/loadQueuePath +# Default is value of druid.zk.paths.base + /loadQueue +#druid.zk.paths.loadQueuePath=/druid/loadQueue # ZK path for Master leadership election -druid.zk.paths.masterPath=/druid/masterPath +# Default is value of druid.zk.paths.base + /master +#druid.zk.paths.masterPath=/druid/master # ZK path for publishing served segments -druid.zk.paths.servedSegmentsPath=/druid/servedSegmentsPath +# Default is value of druid.zk.paths.base + /servedSegments +#druid.zk.paths.servedSegmentsPath=/druid/servedSegments + +# Default is value of druid.zk.paths.base + /leaderLatch +#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch + +# ZK path for properties stored in zookeeper +# Default is value of druid.zk.paths.base + /properties +#druid.zk.paths.propertiesPath=/druid/properties druid.host=127.0.0.1 druid.port=8080 @@ -79,8 +111,10 @@ com.metamx.emitter.http=true # unknown # com.metamx.emitter.logging.level=info # unknown # com.metamx.metrics.emitter.period=PT60S -# ZK quorum IPs; if empty, the use demo mode -druid.zk.service.host= +# ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]] +# if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples) +druid.zk.service.host=none + # msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks druid.zk.service.connectionTimeout=1000000 @@ -92,7 +126,6 @@ druid.processing.numThreads=3 # other properties found # druid.computation.buffer.size=10000000 -druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatchPath druid.merger.threads=1 druid.merger.runner=remote druid.merger.whitelist.enabled=false diff --git a/examples/twitter/twitter_realtime.spec b/examples/twitter/twitter_realtime.spec index 8ffe014822b..3381b075060 100644 --- a/examples/twitter/twitter_realtime.spec +++ b/examples/twitter/twitter_realtime.spec @@ -3,7 +3,7 @@ "aggregators":[ {"type":"count", "name":"tweets"}, {"type":"doubleSum","fieldName":"follower_count","name":"total_follower_count"}, - {"type":"doubleSum","fieldName":"retweet_count","name":"tota_retweet_count"}, + {"type":"doubleSum","fieldName":"retweet_count","name":"total_retweet_count"}, {"type":"doubleSum","fieldName":"friends_count","name":"total_friends_count"}, {"type":"doubleSum","fieldName":"statuses_count","name":"total_statuses_count"}, diff --git a/install/druid_setup.sh b/install/druid_setup.sh new file mode 100755 index 00000000000..2ab0067eb43 --- /dev/null +++ b/install/druid_setup.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# Script to run util DruidSetup which will initialize zookeeper locations, properties, and metadata store (MySQL or similar). +# The dump cmd of DruidSetup will dump properties stored at and zpaths of zookeeper. +# Run with no args to get usage. + +which java >/dev/null +WJ=$? +if [ "${JAVA_HOME}" ]; then + RUN_JAVA=$JAVA_HOME/bin/java +elif [ $WJ -eq 0 ]; then + RUN_JAVA=java +fi + +[ -z "${RUN_JAVA}" ] && echo "env var JAVA_HOME is not defined and java not in path" && exit 1 + +DRUID_DIR=$(cd $(dirname $0)/.. ; pwd) + +DRUID_SERVER_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-server*selfcontained.jar') |head -1)" +[ -z "${DRUID_SERVER_JAR}" ] && echo "unable to find druid server jar" && exit 2 +echo "using ${DRUID_SERVER_JAR}" +echo + +$RUN_JAVA -cp "${DRUID_SERVER_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Djava.net.preferIPv4Stack=true -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $* + +[ -e ${DRUID_DIR}/install/druid_setup.log ] && egrep "WARN|ERROR|FATAL" ${DRUID_DIR}/install/druid_setup.log diff --git a/install/log4j.xml b/install/log4j.xml new file mode 100644 index 00000000000..aa6eb8cc13e --- /dev/null +++ b/install/log4j.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 8a4b8b3d650..a00e9ffbf81 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ merger realtime examples + druid-services diff --git a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java new file mode 100644 index 00000000000..6ef2432e5a7 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java @@ -0,0 +1,412 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.utils; + +import com.google.common.io.Closeables; +import com.metamx.common.logger.Logger; +import com.metamx.druid.db.DbConnector; +import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.zk.PropertiesZkSerializer; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; + +import java.io.*; +import java.util.Properties; + +/** + * Set up the shared Druid ensemble space. + * This affects the Zookeeper which holds common properties, and znode paths for coordination, + * and also performs metadata table creation in the database (MySQL). + * By storing ensemble-wide properties in zookeeper, cluster administration is simplified. + * Each service instance can also have local property overrides in the file runtime.properties + * located in the classpath. + *

+ * The design rules are noted here with rationale + *

+ *

+ *

+ * Design Rule Notes:
+ * (a) Properties set on the commandline of services take precedence over  runtime.properties which
+ *       takes precedence over properties stored in zookeeper.
+ *
+ *       Rationale:  organizing principle.
+ *
+ * (a) Services load properties on startup only.
+ *
+ *       Rationale: stepwise changes are safer and easier to manage.
+ *
+ * (b) Only DruidSetup creates properties and znode paths (zpaths) on zookeeper and no other tool or service
+ *       will make ensemble-wide settings automatically.
+ *
+ *       Rationale: one place for this logic, under manual control, and avoid accidental
+ *       namespace/partition creation.
+ *
+ * (c) DruidSetup creates reasonable zpaths but supports overrides to enable tactical
+ *   version transitions (just in case).  If zpaths are overridden, then they must all be
+ *   overridden together since they are not independent.
+ *
+ *       Rationale:  convention beats configuration most of the time; sometimes configuration is needed
+ *       negotiate unusual cases.
+ *
+ * (d) Properties settings stored on zookeeper are not cumulative; previous properties are removed before
+ *   new ones are stored.
+ *       Rationale:  Keep the operations at the granularity of a file of properties, avoid
+ *       dependence on order of setup operations, enable dumping of current settings.
+ * 
+ * + * @author pbaclace + */ +public class DruidSetup +{ + private static final Logger log = new Logger(DruidSetup.class); + + public static void main(final String[] args) + { + ZkClient zkClient = null; + + if (args.length < 2 || args.length > 3) { + printUsage(); + System.exit(1); + } + String cmd = args[0]; + if ("dump".equals(cmd) && args.length == 3) { + final String zkConnect = args[1]; + zkClient = connectToZK(zkConnect); + String zpathBase = args[2]; + dumpFromZk(zkClient, zpathBase, zkConnect, System.out); + } else if ("put".equals(cmd) && args.length == 3) { + final String zkConnect = args[1]; + zkClient = connectToZK(zkConnect); + final String pfile = args[2]; + putToZk(zkClient, pfile); + } else if ("dbprep".equals(cmd) && args.length == 2) { + final String pfile = args[1]; + prepDB(pfile); + } else { + printUsage(); + System.exit(1); + } + + if (zkClient != null) { + zkClient.close(); + } + } + + /** + * Load properties from local file, validate and tweak. + *

+ * This can only be used for setup, not service run time because of some assembly here. + * + * @param pfile path to runtime.properties file to be read. + * @param props Properties object to fill, props like druid.zk.paths.*Path will always be set after + * this method either because the input file has them set (overrides) or because prop + * druid.zk.paths.base was used as a prefix to construct the default zpaths; + * druid.zk.paths.base will be set iff there is a single base for all zpaths + */ + private static void loadProperties(String pfile, Properties props) + { + InputStream is = null; + try { + is = new FileInputStream(pfile); + } catch (FileNotFoundException e) { + System.err.println("File not found: " + pfile); + System.err.println("No changes made."); + System.exit(4); + } catch (IOException ioe) { + reportErrorAndExit(pfile, ioe); + } + try { + props.load(is); + } catch (IOException e) { + reportErrorAndExit(pfile, e); + } finally { + Closeables.closeQuietly(is); + } + + if (! Initialization.validateResolveProps(props)) { // bail, errors have been emitted + System.exit(9); + } + + // emit effective zpaths to be used + System.out.println("Effective zpath properties:"); + for (String pname : Initialization.SUB_PATH_PROPS) { + System.out.println(" " + pname + "=" + props.getProperty(pname)); + } + System.out.println(" " + "druid.zk.paths.propertiesPath" + "=" + + props.getProperty("druid.zk.paths.propertiesPath")); + + } + + /** + * @param zkClient zookeeper client. + * @param zpathBase znode base path. + * @param zkConnect ZK coordinates in the form host1:port1[,host2:port2[, ...]] + * @param out + */ + private static void dumpFromZk(ZkClient zkClient, String zpathBase, String zkConnect, PrintStream out) + { + final String propPath = Initialization.makePropPath(zpathBase); + if (zkClient.exists(propPath)) { + Properties currProps = zkClient.readData(propPath, true); + if (currProps != null) { + out.println("# Begin Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath); + try { + currProps.store(out, "Druid"); + } catch (IOException ignored) { + } + out.println("# End Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath); + out.println("# NOTE: properties like druid.zk.paths.*Path are always stored in zookeeper in absolute form."); + out.println(); + } + } + //out.println("Zookeeper znodes and zpaths for " + zkConnect + " (showing all zpaths)"); + // list all znodes + // (not ideal since recursive listing starts at / instead of at baseZkPath) + //zkClient.showFolders(out); + } + + /** + * @param zkClient zookeeper client. + * @param pfile + */ + private static void putToZk(ZkClient zkClient, String pfile) + { + Properties props = new Properties(); + loadProperties(pfile, props); + String zpathBase = props.getProperty("druid.zk.paths.base"); + + // create znodes first + // + createZNodes(zkClient, zpathBase, System.out); + + // put props + // + updatePropertiesZK(zkClient, zpathBase, props, System.out); + } + + /** + * @param zkClient zookeeper client. + * @param zpathBase znode base path. + * @param props the properties to store. + * @param out the PrintStream for human readable update summary (usually System.out). + */ + private static void updatePropertiesZK(ZkClient zkClient, String zpathBase, Properties props, PrintStream out) + { + final String propPathOverride = props.getProperty("druid.zk.paths.propertiesPath"); + final String propPathConstructed = Initialization.makePropPath(zpathBase); + final String propPath = (propPathOverride != null) ? propPathOverride : propPathConstructed; + Properties currProps = null; + if (zkClient.exists(propPath)) { + currProps = zkClient.readData(propPath, true); + } + boolean propsDiffer = false; + if (currProps == null) { + out.println("No properties currently stored in zk"); + propsDiffer = true; + } else { // determine whether anything is different + int countNew = 0; + int countDiffer = 0; + int countRemoved = 0; + int countNoChange = 0; + String currMetaPropVal = ""; + StringBuilder changes = new StringBuilder(1024); + for (String pname : props.stringPropertyNames()) { + if (pname.equals(PropertiesZkSerializer.META_PROP)) continue; // ignore meta prop datestamp, if any + final String pvalue = props.getProperty(pname); + final String pvalueCurr = currProps.getProperty(pname); + if (pvalueCurr == null) { + countNew++; + } else { + if (pvalueCurr.equals(pvalue)) { + countNoChange++; + } else { + countDiffer++; + changes.append("CHANGED: ").append(pname).append("= PREV=").append(pvalueCurr) + .append(" NOW=").append(pvalue).append("\n"); + } + } + } + for (String pname : currProps.stringPropertyNames()) { + if (pname.equals(PropertiesZkSerializer.META_PROP)) { + currMetaPropVal = currProps.getProperty(pname); + continue; // ignore meta prop datestamp + } + if (props.getProperty(pname) == null) { + countRemoved++; + changes.append("REMOVED: ").append(pname).append("=").append(currProps.getProperty(pname)).append("\n"); + } + } + if (countNew + countRemoved + countDiffer > 0) { + out.println("Current properties differ: " + + countNew + " new, " + + countDiffer + " different values, " + + countRemoved + " removed, " + + countNoChange + " unchanged, " + + currMetaPropVal + " previously updated" + ); + out.println(changes); + propsDiffer = true; + } else { + out.println("Current properties identical to file given, entry count=" + countNoChange); + } + } + if (propsDiffer) { + if (currProps != null) { + zkClient.delete(propPath); + } + // update zookeeper + zkClient.createPersistent(propPath, props); + out.println("Properties updated, entry count=" + props.size()); + } + } + + /** + * @param zkClient zookeeper client. + * @param zpathBase znode base path. + * @param out the PrintStream for human readable update summary. + */ + private static void createZNodes(ZkClient zkClient, String zpathBase, PrintStream out) + { + zkClient.createPersistent(zpathBase, true); + for (String subPath : Initialization.SUB_PATHS) { + final String thePath = String.format("%s/%s", zpathBase, subPath); + if (zkClient.exists(thePath)) { + out.printf("Path[%s] exists already%n", thePath); + } else { + out.printf("Creating ZK path[%s]%n", thePath); + zkClient.createPersistent(thePath, true); + } + } + } + + private static void reportErrorAndExit(String pfile, IOException ioe) + { + System.err.println("Could not read file: " + pfile); + System.err.println(" because of: " + ioe); + System.err.println("No changes made."); + System.exit(4); + } + + private static ZkClient connectToZK(String zkConnect) + { + return new ZkClient( + new ZkConnection(zkConnect), + Integer.MAX_VALUE, + new PropertiesZkSerializer() + ); + } + + /** + * Connect to db and create table, if it does not exist. + * NOTE: Connection failure only shows up in log output. + * + * @param pfile path to properties file to use. + */ + private static void prepDB(final String pfile) + { + Properties tmp_props = new Properties(); + loadProperties(pfile, tmp_props); + final String tableName = tmp_props.getProperty("druid.database.segmentTable", "prod_segments"); + + final String dbConnectionUrl = tmp_props.getProperty("druid.database.connectURI"); + final String username = tmp_props.getProperty("druid.database.user"); + final String password = tmp_props.getProperty("druid.database.password"); + + // + // validation + // + if (tableName.length() == 0 || !Character.isLetter(tableName.charAt(0))) { + throw new RuntimeException("poorly formed property druid.database.segmentTable=" + tableName); + } + if (username == null || username.length() == 0) { + throw new RuntimeException("poorly formed property druid.database.user=" + username); + } + if (password == null || password.length() == 0) { + throw new RuntimeException("poorly formed property druid.database.password=" + password); + } + if (dbConnectionUrl == null || dbConnectionUrl.length() == 0) { + throw new RuntimeException("poorly formed property druid.database.connectURI=" + dbConnectionUrl); + } + + final DbConnectorConfig config = new DbConnectorConfig() + { + { + } + + @Override + public String getDatabaseConnectURI() + { + return dbConnectionUrl; + } + + @Override + public String getDatabaseUser() + { + return username; + } + + @Override + public String getDatabasePassword() + { + return password; + } + + @Override + public String getSegmentTable() + { + return tableName; + } + }; + + DbConnector dbConnector = new DbConnector(config); + + DbConnector.createSegmentTable(dbConnector.getDBI(), config); + + } + + /** + * Print usage to stdout. + */ + private static void printUsage() + { + System.out.println("Usage: CMD [args]\n" + + " Where CMD is a particular command:\n" + + " CMD choices:\n" + + " dump zkConnect baseZkPath # dump info from zk at given coordinates\n" + + " dbprep propfile # create metadata table in db\n" + + " put zkConnect propfile # store paths and propfile into zk at given coordinates\n" + + " args:\n" + + " zkConnect: ZK coordinates in the form host1:port1[,host2:port2[, ...]]\n" + + " baseZkPath: like /druid or /mydruid etc. to uniquely identify a Druid ensemble\n" + + " and should be equal to property druid.zk.paths.base\n" + + " propfile: Java properties file with common properties for all services in ensemble\n" + + " Notes:\n" + + " dump command makes no modifications and shows zk properties at baseZkPath.\n" + + " put command can safely be invoked more than once, will not disturb existing queues,\n" + + " and properties are not cumulative.\n" + + " A zookeeper can service more than one Druid ensemble if baseZkPath is distinct.\n" + + " Druid services only load properties during process startup.\n" + + " Properties defined on a service command line take precedence over the runtime.properties\n" + + " file which takes precedence over properties stored in zookeeper.\n" + + "" + ); + } +} diff --git a/server/src/main/java/com/metamx/druid/utils/ZkSetup.java b/server/src/main/java/com/metamx/druid/utils/ZkSetup.java index 2c4b8f768af..b2c7f27aae2 100644 --- a/server/src/main/java/com/metamx/druid/utils/ZkSetup.java +++ b/server/src/main/java/com/metamx/druid/utils/ZkSetup.java @@ -21,11 +21,13 @@ package com.metamx.druid.utils; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.Initialization; import com.metamx.druid.zk.StringZkSerializer; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; /** + * @deprecated see DruidSetup */ public class ZkSetup { @@ -33,13 +35,12 @@ public class ZkSetup { if (args.length != 5) { System.out.println("Usage: zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName"); + System.out.println("This utility is deprecated, see DruidSetup instead."); System.exit(1); } String path = args[1]; - String[] subPaths = new String[]{"announcements", "servedSegments", "loadQueue", "master"}; - final ZkClient zkClient = new ZkClient( new ZkConnection(args[0]), Integer.MAX_VALUE, @@ -47,7 +48,7 @@ public class ZkSetup ); zkClient.createPersistent(path, true); - for (String subPath : subPaths) { + for (String subPath : Initialization.SUB_PATHS) { final String thePath = String.format("%s/%s", path, subPath); if (zkClient.exists(thePath)) { System.out.printf("Path[%s] exists already%n", thePath);