mirror of https://github.com/apache/druid.git
Merge branch 'master' into misc
Conflicts: merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
This commit is contained in:
commit
4c98b176e4
7
README
7
README
|
@ -1 +1,6 @@
|
|||
See the "Wiki":https://github.com/metamx/druid/wiki
|
||||
See the "Wiki" https://github.com/metamx/druid/wiki
|
||||
|
||||
Build with build.sh
|
||||
|
||||
See examples/rand
|
||||
See examples/twitter
|
||||
|
|
|
@ -31,6 +31,7 @@ import com.metamx.druid.http.FileRequestLogger;
|
|||
import com.metamx.druid.http.RequestLogger;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.druid.zk.StringZkSerializer;
|
||||
import com.metamx.druid.zk.PropertiesZkSerializer;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.CuratorFrameworkFactory;
|
||||
import com.netflix.curator.retry.ExponentialBackoffRetry;
|
||||
|
@ -59,7 +60,18 @@ public class Initialization
|
|||
{
|
||||
private static final Logger log = new Logger(Initialization.class);
|
||||
|
||||
private static volatile Properties props = null;
|
||||
private static final Properties zkProps = new Properties();
|
||||
private static final Properties fileProps = new Properties(zkProps);
|
||||
private static Properties props = null;
|
||||
public final static String PROP_SUBPATH = "properties";
|
||||
public final static String[] SUB_PATHS = {"announcements", "servedSegments", "loadQueue", "master"};
|
||||
public final static String[] SUB_PATH_PROPS = {
|
||||
"druid.zk.paths.announcementsPath",
|
||||
"druid.zk.paths.servedSegmentsPath",
|
||||
"druid.zk.paths.loadQueuePath",
|
||||
"druid.zk.paths.masterPath"
|
||||
};
|
||||
public static final String DEFAULT_ZPATH = "/druid";
|
||||
|
||||
public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle)
|
||||
{
|
||||
|
@ -107,23 +119,37 @@ public class Initialization
|
|||
);
|
||||
}
|
||||
|
||||
public static Properties loadProperties()
|
||||
|
||||
/**
|
||||
* Load properties.
|
||||
* Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper.
|
||||
* Idempotent. Thread-safe. Properties are only loaded once.
|
||||
* If property druid.zk.service.host=none then do not load properties from zookeeper.
|
||||
*
|
||||
* @return Properties ready to use.
|
||||
*/
|
||||
public synchronized static Properties loadProperties()
|
||||
{
|
||||
if (props != null) {
|
||||
return props;
|
||||
}
|
||||
|
||||
Properties loadedProps = null;
|
||||
// Note that zookeeper coordinates must be either in cmdLine or in runtime.properties
|
||||
Properties sp = System.getProperties();
|
||||
|
||||
Properties tmp_props = new Properties(fileProps); // the head of the 3 level Properties chain
|
||||
tmp_props.putAll(sp);
|
||||
|
||||
final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties");
|
||||
if (stream == null) {
|
||||
log.info("runtime.properties didn't exist as a resource, loading system properties instead.");
|
||||
loadedProps = System.getProperties();
|
||||
log.info(
|
||||
"runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."
|
||||
);
|
||||
} else {
|
||||
log.info("Loading properties from runtime.properties.");
|
||||
log.info("Loading properties from runtime.properties");
|
||||
try {
|
||||
loadedProps = new Properties();
|
||||
try {
|
||||
loadedProps.load(stream);
|
||||
fileProps.load(stream);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -134,13 +160,60 @@ public class Initialization
|
|||
}
|
||||
}
|
||||
|
||||
for (String prop : loadedProps.stringPropertyNames()) {
|
||||
log.info("Loaded Property[%s] as [%s]", prop, loadedProps.getProperty(prop));
|
||||
// log properties from file; note stringPropertyNames() will follow Properties.defaults but
|
||||
// next level is empty at this point.
|
||||
for (String prop : fileProps.stringPropertyNames()) {
|
||||
log.info("Loaded(runtime.properties) Property[%s] as [%s]", prop, fileProps.getProperty(prop));
|
||||
}
|
||||
|
||||
props = loadedProps;
|
||||
final String zk_hosts = tmp_props.getProperty("druid.zk.service.host");
|
||||
|
||||
return loadedProps;
|
||||
if (zk_hosts != null) {
|
||||
if (!zk_hosts.equals("none")) { // get props from zk
|
||||
final ZkClient zkPropLoadingClient;
|
||||
final ZkClientConfig clientConfig = new ZkClientConfig()
|
||||
{
|
||||
@Override
|
||||
public String getZkHosts()
|
||||
{
|
||||
return zk_hosts;
|
||||
}
|
||||
};
|
||||
|
||||
zkPropLoadingClient = new ZkClient(
|
||||
new ZkConnection(clientConfig.getZkHosts()),
|
||||
clientConfig.getConnectionTimeout(),
|
||||
new PropertiesZkSerializer()
|
||||
);
|
||||
zkPropLoadingClient.waitUntilConnected();
|
||||
String propertiesZNodePath = tmp_props.getProperty("druid.zk.paths.propertiesPath");
|
||||
if (propertiesZNodePath == null) {
|
||||
String zpathBase = tmp_props.getProperty("druid.zk.paths.base", DEFAULT_ZPATH);
|
||||
propertiesZNodePath = makePropPath(zpathBase);
|
||||
}
|
||||
// get properties stored by zookeeper (lowest precedence)
|
||||
if (zkPropLoadingClient.exists(propertiesZNodePath)) {
|
||||
Properties p = zkPropLoadingClient.readData(propertiesZNodePath, true);
|
||||
if (p != null) {
|
||||
zkProps.putAll(p);
|
||||
}
|
||||
}
|
||||
// log properties from zk
|
||||
for (String prop : zkProps.stringPropertyNames()) {
|
||||
log.info("Loaded(properties stored in zk) Property[%s] as [%s]", prop, zkProps.getProperty(prop));
|
||||
}
|
||||
} // get props from zk
|
||||
} else { // ToDo: should this be an error?
|
||||
log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination.");
|
||||
}
|
||||
// validate properties now that all levels of precedence are loaded
|
||||
if (!validateResolveProps(tmp_props)) {
|
||||
log.error("Properties failed to validate, cannot continue");
|
||||
throw new RuntimeException("Properties failed to validate");
|
||||
}
|
||||
props = tmp_props; // publish
|
||||
|
||||
return props;
|
||||
}
|
||||
|
||||
public static Server makeJettyServer(ServerConfig config)
|
||||
|
@ -284,4 +357,127 @@ public class Initialization
|
|||
new File(PropUtils.getProperty(props, "druid.request.logging.dir"))
|
||||
);
|
||||
}
|
||||
|
||||
public static String makePropPath(String basePath)
|
||||
{
|
||||
return String.format("%s/%s", basePath, PROP_SUBPATH);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate and Resolve Properties.
|
||||
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
|
||||
* Check validity so that if druid.zk.paths.*Path props are set, all are set,
|
||||
* if none set, then construct defaults relative to druid.zk.paths.base and add these
|
||||
* to the properties chain.
|
||||
*
|
||||
* @param props
|
||||
*
|
||||
* @return true if valid zpath properties.
|
||||
*/
|
||||
public static boolean validateResolveProps(Properties props)
|
||||
{
|
||||
boolean zpathValidateFailed;// validate druid.zk.paths.base
|
||||
String propertyZpath = props.getProperty("druid.zk.paths.base");
|
||||
zpathValidateFailed = zpathBaseCheck(propertyZpath, "property druid.zk.paths.base");
|
||||
|
||||
String zpathEffective = DEFAULT_ZPATH;
|
||||
if (propertyZpath != null) {
|
||||
zpathEffective = propertyZpath;
|
||||
}
|
||||
|
||||
final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath");
|
||||
|
||||
if (!zpathValidateFailed) {
|
||||
System.out.println("Effective zpath prefix=" + zpathEffective);
|
||||
}
|
||||
|
||||
// validate druid.zk.paths.*Path properties
|
||||
//
|
||||
// if any zpath overrides are set in properties, all must be set, and they must start with /
|
||||
int zpathOverrideCount = 0;
|
||||
boolean zpathOverridesNotAbs = false;
|
||||
StringBuilder sbErrors = new StringBuilder(100);
|
||||
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
|
||||
String val = props.getProperty(SUB_PATH_PROPS[i]);
|
||||
if (val != null) {
|
||||
zpathOverrideCount++;
|
||||
if (!val.startsWith("/")) {
|
||||
zpathOverridesNotAbs = true;
|
||||
sbErrors.append(SUB_PATH_PROPS[i]).append("=").append(val).append("\n");
|
||||
zpathValidateFailed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// separately check druid.zk.paths.propertiesPath (not in SUB_PATH_PROPS since it is not a "dir")
|
||||
if (propertiesZpathOverride != null) {
|
||||
zpathOverrideCount++;
|
||||
if (!propertiesZpathOverride.startsWith("/")) {
|
||||
zpathOverridesNotAbs = true;
|
||||
sbErrors.append("druid.zk.paths.propertiesPath").append("=").append(propertiesZpathOverride).append("\n");
|
||||
zpathValidateFailed = true;
|
||||
}
|
||||
}
|
||||
if (zpathOverridesNotAbs) {
|
||||
System.err.println(
|
||||
"When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
|
||||
"the znode path must start with '/' (slash) ; problem overrides:"
|
||||
);
|
||||
System.err.print(sbErrors.toString());
|
||||
}
|
||||
if (zpathOverrideCount > 0) {
|
||||
if (zpathOverrideCount < SUB_PATH_PROPS.length) {
|
||||
zpathValidateFailed = true;
|
||||
System.err.println(
|
||||
"When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
|
||||
"all must be overridden together; missing overrides:"
|
||||
);
|
||||
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
|
||||
String val = props.getProperty(SUB_PATH_PROPS[i]);
|
||||
if (val == null) {
|
||||
System.err.println(" " + SUB_PATH_PROPS[i]);
|
||||
}
|
||||
}
|
||||
} else { // proper overrides
|
||||
// do not prefix with property druid.zk.paths.base
|
||||
; // fallthru
|
||||
}
|
||||
} else { // no overrides
|
||||
if (propertyZpath == null) { // if default base is used, store it as documentation
|
||||
props.setProperty("druid.zk.paths.base", zpathEffective);
|
||||
}
|
||||
//
|
||||
// Resolve default zpaths using zpathEffective as base
|
||||
//
|
||||
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
|
||||
props.setProperty(SUB_PATH_PROPS[i], zpathEffective + "/" + SUB_PATHS[i]);
|
||||
}
|
||||
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
|
||||
}
|
||||
return !zpathValidateFailed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check znode zpath base for proper slash, no trailing slash.
|
||||
*
|
||||
* @param zpathBase znode base path, if null then this method does nothing.
|
||||
* @param errorMsgPrefix error context to use if errors are emitted, should indicate
|
||||
* where the zpathBase value came from.
|
||||
*
|
||||
* @return true if validate failed.
|
||||
*/
|
||||
public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix)
|
||||
{
|
||||
boolean zpathValidateFailed = false;
|
||||
if (zpathBase != null) {
|
||||
if (!zpathBase.startsWith("/")) {
|
||||
zpathValidateFailed = true;
|
||||
System.err.println(errorMsgPrefix + " must start with '/' (slash); found=" + zpathBase);
|
||||
}
|
||||
if (zpathBase.endsWith("/")) {
|
||||
zpathValidateFailed = true;
|
||||
System.err.println(errorMsgPrefix + " must NOT end with '/' (slash); found=" + zpathBase);
|
||||
}
|
||||
}
|
||||
return zpathValidateFailed;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.zk;
|
||||
|
||||
import com.metamx.common.IAE;
|
||||
import org.I0Itec.zkclient.exception.ZkMarshallingError;
|
||||
import org.I0Itec.zkclient.serialize.ZkSerializer;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.Charset;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Properties;
|
||||
import java.util.TimeZone;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class PropertiesZkSerializer implements ZkSerializer
|
||||
{
|
||||
private static final SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmss'z'");
|
||||
static {
|
||||
df.setTimeZone(TimeZone.getTimeZone("UTC"));
|
||||
}
|
||||
public final static String META_PROP = "__MODIFIED";
|
||||
|
||||
@Override
|
||||
public byte[] serialize(Object data) throws ZkMarshallingError
|
||||
{
|
||||
if (data instanceof Properties) {
|
||||
final Properties props = (Properties) data;
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(props.size()*60 + 30);
|
||||
try {
|
||||
final String ts = df.format(new Date());
|
||||
props.setProperty("__MODIFIED", ts);
|
||||
props.store(bos, "Druid");
|
||||
} catch (IOException ignored) { }
|
||||
return bos.toByteArray();
|
||||
}
|
||||
throw new IAE("Can only serialize Properties into ZK");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(byte[] bytes) throws ZkMarshallingError
|
||||
{
|
||||
final Properties props = new Properties();
|
||||
try {
|
||||
props.load(new ByteArrayInputStream(bytes));
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
return props;
|
||||
}
|
||||
}
|
|
@ -132,10 +132,15 @@ public class Histogram
|
|||
return buf.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a visual representation of a histogram object.
|
||||
* Initially returns an array of just the min. and max. values
|
||||
* but can also support the addition of quantiles.
|
||||
*/
|
||||
public HistogramVisual asVisual() {
|
||||
float[] visualCounts = new float[bins.length - 2];
|
||||
for(int i = 0; i < visualCounts.length; ++i) visualCounts[i] = (float)bins[i + 1];
|
||||
return new HistogramVisual(breaks, visualCounts, min, max);
|
||||
return new HistogramVisual(breaks, visualCounts, new float[]{min, max});
|
||||
}
|
||||
|
||||
public static Histogram fromBytes(byte[] bytes) {
|
||||
|
|
|
@ -29,15 +29,14 @@ public class HistogramVisual
|
|||
{
|
||||
@JsonProperty final public double[] breaks;
|
||||
@JsonProperty final public double[] counts;
|
||||
@JsonProperty final double min;
|
||||
@JsonProperty final double max;
|
||||
// an array of the quantiles including the min. and max.
|
||||
@JsonProperty final public double[] quantiles;
|
||||
|
||||
@JsonCreator
|
||||
public HistogramVisual(
|
||||
@JsonProperty double[] breaks,
|
||||
@JsonProperty double[] counts,
|
||||
@JsonProperty double min,
|
||||
@JsonProperty double max
|
||||
@JsonProperty double[] quantiles
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(breaks != null, "breaks must not be null");
|
||||
|
@ -46,15 +45,13 @@ public class HistogramVisual
|
|||
|
||||
this.breaks = breaks;
|
||||
this.counts = counts;
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
this.quantiles = quantiles;
|
||||
}
|
||||
|
||||
public HistogramVisual(
|
||||
float[] breaks,
|
||||
float[] counts,
|
||||
float min,
|
||||
float max
|
||||
float[] quantiles
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(breaks != null, "breaks must not be null");
|
||||
|
@ -63,10 +60,10 @@ public class HistogramVisual
|
|||
|
||||
this.breaks = new double[breaks.length];
|
||||
this.counts = new double[counts.length];
|
||||
this.quantiles = new double[quantiles.length];
|
||||
for(int i = 0; i < breaks.length; ++i) this.breaks[i] = breaks[i];
|
||||
for(int i = 0; i < counts.length; ++i) this.counts[i] = counts[i];
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
for(int i = 0; i < quantiles.length; ++i) this.quantiles[i] = quantiles[i];
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,8 +72,7 @@ public class HistogramVisual
|
|||
return "HistogramVisual{" +
|
||||
"counts=" + Arrays.toString(counts) +
|
||||
", breaks=" + Arrays.toString(breaks) +
|
||||
", min=" + min +
|
||||
", max=" + max +
|
||||
", quantiles=" + Arrays.toString(quantiles) +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,8 +68,7 @@ public class HistogramTest
|
|||
Map<String,Object> expectedObj = Maps.newLinkedHashMap();
|
||||
expectedObj.put("breaks", Arrays.asList(visualBreaks));
|
||||
expectedObj.put("counts", Arrays.asList(visualCounts));
|
||||
expectedObj.put("min", -1.0);
|
||||
expectedObj.put("max", 1.0);
|
||||
expectedObj.put("quantiles", Arrays.asList(new Double[]{-1.0, 1.0}));
|
||||
|
||||
Map<String,Object> obj = (Map<String, Object>)objectMapper.readValue(json, Object.class);
|
||||
Assert.assertEquals(expectedObj, obj);
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Druid - a distributed column store.
|
||||
~ Copyright (C) 2012 Metamarkets Group Inc.
|
||||
~
|
||||
~ This program is free software; you can redistribute it and/or
|
||||
~ modify it under the terms of the GNU General Public License
|
||||
~ as published by the Free Software Foundation; either version 2
|
||||
~ of the License, or (at your option) any later version.
|
||||
~
|
||||
~ This program is distributed in the hope that it will be useful,
|
||||
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
~ GNU General Public License for more details.
|
||||
~
|
||||
~ You should have received a copy of the GNU General Public License
|
||||
~ along with this program; if not, write to the Free Software
|
||||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-services</artifactId>
|
||||
<name>druid-services</name>
|
||||
<description>druid-services</description>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-realtime</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>1.6</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputFile>
|
||||
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
|
||||
</outputFile>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -1,6 +1,6 @@
|
|||
# Properties for demo of Realtime Node in standalone mode.
|
||||
# To Use This: copy this file to runtime.properties and put directory containing it in classpath.
|
||||
#
|
||||
comment.origin=druid/examples/rand/src/main/resources/runtime.properties
|
||||
|
||||
# S3 access
|
||||
com.metamx.aws.accessKey=<S3 access key>
|
||||
|
@ -15,7 +15,10 @@ druid.database.user=user
|
|||
druid.database.password=password
|
||||
# time between polling for metadata database
|
||||
druid.database.poll.duration=PT1M
|
||||
|
||||
# table for segment metadata coordination, no default
|
||||
druid.database.segmentTable=prod_segments
|
||||
|
||||
#in progress 20121010 #druid.database.taskTable=
|
||||
|
||||
druid.emitter.period=PT60S
|
||||
|
@ -42,23 +45,52 @@ druid.server.maxSize=300000000000
|
|||
# =realtime or =historical (default)
|
||||
druid.server.type=realtime
|
||||
|
||||
# ZK path for service discovery within the cluster
|
||||
druid.zk.paths.announcementsPath=/druid/announcementsPath
|
||||
#
|
||||
# zookeeper (zk) znode paths (zpaths)
|
||||
#
|
||||
|
||||
# Legacy path, must be set, but can be ignored
|
||||
druid.zk.paths.indexesPath=/druid/indexesPath
|
||||
# base znode which establishes a unique namespace for a Druid ensemble.
|
||||
# Default is /druid if not set
|
||||
# This can also be set via parameter baseZkPath of the DruidSetup commandline
|
||||
# druid.zk.paths.base=
|
||||
|
||||
druid.zk.paths.indexer.tasksPath=/druid/tasksPath
|
||||
druid.zk.paths.indexer.statusPath=/druid/statusPath
|
||||
# If these zpath properties like druid.zk.paths.*Path are overridden, then all must be
|
||||
# overridden together for upgrade safety reasons.
|
||||
# The commandline utility DruidSetup, which is used to set up properties on zookeeper,
|
||||
# will validate this. Also, these zpaths must start with / because they are not relative.
|
||||
|
||||
# ZK znode path for service discovery within the cluster.
|
||||
# Default is value of druid.zk.paths.base + /announcements
|
||||
# druid.zk.paths.announcementsPath=/druid/announcements
|
||||
|
||||
# Legacy znode path, must be set, but can be ignored
|
||||
#druid.zk.paths.indexesPath=/druid/indexes
|
||||
|
||||
# Default is value of druid.zk.paths.base + /tasks
|
||||
##druid.zk.paths.indexer.tasksPath=/druid/tasks
|
||||
|
||||
# Default is value of druid.zk.paths.base + /status
|
||||
#druid.zk.paths.indexer.statusPath=/druid/status
|
||||
|
||||
# ZK path for load/drop protocol between Master/Compute
|
||||
druid.zk.paths.loadQueuePath=/druid/loadQueuePath
|
||||
# Default is value of druid.zk.paths.base + /loadQueue
|
||||
#druid.zk.paths.loadQueuePath=/druid/loadQueue
|
||||
|
||||
# ZK path for Master leadership election
|
||||
druid.zk.paths.masterPath=/druid/masterPath
|
||||
# Default is value of druid.zk.paths.base + /master
|
||||
#druid.zk.paths.masterPath=/druid/master
|
||||
|
||||
# ZK path for publishing served segments
|
||||
druid.zk.paths.servedSegmentsPath=/druid/servedSegmentsPath
|
||||
# Default is value of druid.zk.paths.base + /servedSegments
|
||||
#druid.zk.paths.servedSegmentsPath=/druid/servedSegments
|
||||
|
||||
# Default is value of druid.zk.paths.base + /leaderLatch
|
||||
#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch
|
||||
|
||||
# ZK path for properties stored in zookeeper
|
||||
# Default is value of druid.zk.paths.base + /properties
|
||||
#druid.zk.paths.propertiesPath=/druid/properties
|
||||
|
||||
|
||||
druid.host=127.0.0.1
|
||||
druid.port=8080
|
||||
|
@ -72,8 +104,10 @@ com.metamx.emitter.logging=true
|
|||
com.metamx.emitter.logging.level=info
|
||||
com.metamx.metrics.emitter.period=PT60S
|
||||
|
||||
# ZK quorum IPs; if empty, the use demo mode
|
||||
druid.zk.service.host=
|
||||
# ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]]
|
||||
# if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples)
|
||||
druid.zk.service.host=none
|
||||
|
||||
# msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks
|
||||
druid.zk.service.connectionTimeout=1000000
|
||||
|
||||
|
@ -85,7 +119,7 @@ druid.processing.numThreads=3
|
|||
# other properties found
|
||||
#
|
||||
druid.computation.buffer.size=10000000
|
||||
druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatchPath
|
||||
|
||||
druid.merger.threads=1
|
||||
druid.merger.runner=remote
|
||||
druid.merger.whitelist.enabled=false
|
||||
|
|
|
@ -32,7 +32,7 @@ public class RealtimeStandaloneMain
|
|||
{
|
||||
LogLevelAdjuster.register();
|
||||
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
final Lifecycle lifecycle = new Lifecycle();
|
||||
|
||||
RealtimeNode rn = RealtimeNode.builder().build();
|
||||
lifecycle.addManagedInstance(rn);
|
||||
|
@ -86,6 +86,20 @@ public class RealtimeStandaloneMain
|
|||
}
|
||||
);
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(
|
||||
new Thread(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
log.info("Running shutdown hook");
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Properties for demo of Realtime Node in standalone mode.
|
||||
# To Use This: copy this file to runtime.properties and put directory containing it in classpath.
|
||||
#
|
||||
comment.origin=druid/examples/twitter/src/main/resources/runtime.properties
|
||||
|
||||
# S3 access
|
||||
com.metamx.aws.accessKey=<S3 access key>
|
||||
|
@ -15,7 +16,10 @@ druid.database.user=user
|
|||
druid.database.password=password
|
||||
# time between polling for metadata database
|
||||
druid.database.poll.duration=PT1M
|
||||
|
||||
# table for segment metadata coordination, no default
|
||||
druid.database.segmentTable=prod_segments
|
||||
|
||||
#in progress 20121010 #druid.database.taskTable=
|
||||
|
||||
druid.emitter.period=PT60S
|
||||
|
@ -47,23 +51,51 @@ druid.server.maxSize=300000000000
|
|||
# =realtime or =historical (default)
|
||||
druid.server.type=realtime
|
||||
|
||||
# ZK path for service discovery within the cluster
|
||||
druid.zk.paths.announcementsPath=/druid/announcementsPath
|
||||
#
|
||||
# zookeeper (zk) znode paths (zpaths)
|
||||
#
|
||||
|
||||
# Legacy path, must be set, but can be ignored
|
||||
druid.zk.paths.indexesPath=/druid/indexesPath
|
||||
# base znode which establishes a unique namespace for a Druid ensemble.
|
||||
# Default is /druid if not set
|
||||
# This can also be set via parameter baseZkPath of the DruidSetup commandline
|
||||
# druid.zk.paths.base=
|
||||
|
||||
druid.zk.paths.indexer.tasksPath=/druid/tasksPath
|
||||
druid.zk.paths.indexer.statusPath=/druid/statusPath
|
||||
# If these zpath properties like druid.zk.paths.*Path are overridden, then all must be
|
||||
# overridden together for upgrade safety reasons.
|
||||
# The commandline utility DruidSetup, which is used to set up properties on zookeeper,
|
||||
# will validate this. Also, these zpaths must start with / because they are not relative.
|
||||
|
||||
# ZK znode path for service discovery within the cluster.
|
||||
# Default is value of druid.zk.paths.base + /announcements
|
||||
# druid.zk.paths.announcementsPath=/druid/announcements
|
||||
|
||||
# Legacy znode path, must be set, but can be ignored
|
||||
#druid.zk.paths.indexesPath=/druid/indexes
|
||||
|
||||
# Default is value of druid.zk.paths.base + /tasks
|
||||
##druid.zk.paths.indexer.tasksPath=/druid/tasks
|
||||
|
||||
# Default is value of druid.zk.paths.base + /status
|
||||
#druid.zk.paths.indexer.statusPath=/druid/status
|
||||
|
||||
# ZK path for load/drop protocol between Master/Compute
|
||||
druid.zk.paths.loadQueuePath=/druid/loadQueuePath
|
||||
# Default is value of druid.zk.paths.base + /loadQueue
|
||||
#druid.zk.paths.loadQueuePath=/druid/loadQueue
|
||||
|
||||
# ZK path for Master leadership election
|
||||
druid.zk.paths.masterPath=/druid/masterPath
|
||||
# Default is value of druid.zk.paths.base + /master
|
||||
#druid.zk.paths.masterPath=/druid/master
|
||||
|
||||
# ZK path for publishing served segments
|
||||
druid.zk.paths.servedSegmentsPath=/druid/servedSegmentsPath
|
||||
# Default is value of druid.zk.paths.base + /servedSegments
|
||||
#druid.zk.paths.servedSegmentsPath=/druid/servedSegments
|
||||
|
||||
# Default is value of druid.zk.paths.base + /leaderLatch
|
||||
#druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatch
|
||||
|
||||
# ZK path for properties stored in zookeeper
|
||||
# Default is value of druid.zk.paths.base + /properties
|
||||
#druid.zk.paths.propertiesPath=/druid/properties
|
||||
|
||||
druid.host=127.0.0.1
|
||||
druid.port=8080
|
||||
|
@ -79,8 +111,10 @@ com.metamx.emitter.http=true
|
|||
# unknown # com.metamx.emitter.logging.level=info
|
||||
# unknown # com.metamx.metrics.emitter.period=PT60S
|
||||
|
||||
# ZK quorum IPs; if empty, the use demo mode
|
||||
druid.zk.service.host=
|
||||
# ZK quorum IPs; ZK coordinates in the form host1:port1[,host2:port2[, ...]]
|
||||
# if =none then do not contact zookeeper (only for RealtimeStandaloneMain examples)
|
||||
druid.zk.service.host=none
|
||||
|
||||
# msec; high value means tolerate slow zk nodes, default is to wait about 3 weeks
|
||||
druid.zk.service.connectionTimeout=1000000
|
||||
|
||||
|
@ -92,7 +126,6 @@ druid.processing.numThreads=3
|
|||
# other properties found
|
||||
#
|
||||
druid.computation.buffer.size=10000000
|
||||
druid.zk.paths.indexer.leaderLatchPath=/druid/leaderLatchPath
|
||||
druid.merger.threads=1
|
||||
druid.merger.runner=remote
|
||||
druid.merger.whitelist.enabled=false
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
"aggregators":[
|
||||
{"type":"count", "name":"tweets"},
|
||||
{"type":"doubleSum","fieldName":"follower_count","name":"total_follower_count"},
|
||||
{"type":"doubleSum","fieldName":"retweet_count","name":"tota_retweet_count"},
|
||||
{"type":"doubleSum","fieldName":"retweet_count","name":"total_retweet_count"},
|
||||
{"type":"doubleSum","fieldName":"friends_count","name":"total_friends_count"},
|
||||
{"type":"doubleSum","fieldName":"statuses_count","name":"total_statuses_count"},
|
||||
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
#!/usr/bin/env bash
|
||||
# Script to run util DruidSetup which will initialize zookeeper locations, properties, and metadata store (MySQL or similar).
|
||||
# The dump cmd of DruidSetup will dump properties stored at and zpaths of zookeeper.
|
||||
# Run with no args to get usage.
|
||||
|
||||
which java >/dev/null
|
||||
WJ=$?
|
||||
if [ "${JAVA_HOME}" ]; then
|
||||
RUN_JAVA=$JAVA_HOME/bin/java
|
||||
elif [ $WJ -eq 0 ]; then
|
||||
RUN_JAVA=java
|
||||
fi
|
||||
|
||||
[ -z "${RUN_JAVA}" ] && echo "env var JAVA_HOME is not defined and java not in path" && exit 1
|
||||
|
||||
DRUID_DIR=$(cd $(dirname $0)/.. ; pwd)
|
||||
|
||||
DRUID_SERVER_JAR="$(ls -1 $(find $DRUID_DIR -name 'druid-server*selfcontained.jar') |head -1)"
|
||||
[ -z "${DRUID_SERVER_JAR}" ] && echo "unable to find druid server jar" && exit 2
|
||||
echo "using ${DRUID_SERVER_JAR}"
|
||||
echo
|
||||
|
||||
$RUN_JAVA -cp "${DRUID_SERVER_JAR}" -Dlog4j.configuration=file://${DRUID_DIR}/install/log4j.xml -Djava.net.preferIPv4Stack=true -Duser.timezone=UTC -Dfile.encoding=UTF-8 com.metamx.druid.utils.DruidSetup $*
|
||||
|
||||
[ -e ${DRUID_DIR}/install/druid_setup.log ] && egrep "WARN|ERROR|FATAL" ${DRUID_DIR}/install/druid_setup.log
|
|
@ -0,0 +1,18 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
|
||||
|
||||
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
|
||||
|
||||
<appender name="FileAppender" class="org.apache.log4j.FileAppender">
|
||||
<param name="File" value="druid_setup.log" />
|
||||
<param name="Append" value="false" />
|
||||
<layout class="org.apache.log4j.PatternLayout">
|
||||
<param name="ConversionPattern" value="%d{ISO8601} %p [%t] %c - %m%n"/>
|
||||
</layout>
|
||||
</appender>
|
||||
|
||||
<root>
|
||||
<priority value ="warn" />
|
||||
<appender-ref ref="FileAppender"/>
|
||||
</root>
|
||||
</log4j:configuration>
|
|
@ -19,7 +19,7 @@
|
|||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-merger</artifactId>
|
||||
|
@ -178,6 +178,10 @@
|
|||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.netflix.curator</groupId>
|
||||
<artifactId>curator-test</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -88,7 +88,7 @@ public abstract class MergeTask extends AbstractTask
|
|||
@Override
|
||||
public boolean apply(@Nullable DataSegment segment)
|
||||
{
|
||||
return segment == null || !segment.getDataSource().equals(dataSource);
|
||||
return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
|
@ -25,7 +25,9 @@ import com.google.common.base.Predicate;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
|
@ -40,7 +42,6 @@ import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
|||
import com.metamx.druid.merger.worker.Worker;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.recipes.cache.ChildData;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
|
||||
|
@ -51,9 +52,9 @@ import org.joda.time.Duration;
|
|||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -95,6 +96,8 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
|
||||
private final Object statusLock = new Object();
|
||||
|
||||
private volatile DateTime lastProvisionTime = new DateTime();
|
||||
private volatile DateTime lastTerminateTime = new DateTime();
|
||||
private volatile boolean started = false;
|
||||
|
||||
public RemoteTaskRunner(
|
||||
|
@ -120,27 +123,31 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
public void start()
|
||||
{
|
||||
try {
|
||||
workerPathCache.start();
|
||||
workerPathCache.getListenable().addListener(
|
||||
new PathChildrenCacheListener()
|
||||
{
|
||||
@Override
|
||||
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
|
||||
{
|
||||
final Worker worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
|
||||
final Worker worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
log.info("New worker[%s] found!", worker.getHost());
|
||||
addWorker(worker);
|
||||
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
|
||||
final Worker worker = jsonMapper.readValue(
|
||||
event.getData().getData(),
|
||||
Worker.class
|
||||
);
|
||||
log.info("Worker[%s] removed!", worker.getHost());
|
||||
removeWorker(worker.getHost());
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
workerPathCache.start();
|
||||
|
||||
// Schedule termination of worker nodes periodically
|
||||
Period period = new Period(config.getTerminateResourcesDuration());
|
||||
|
@ -175,7 +182,7 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
{
|
||||
return input.getRunningTasks().isEmpty()
|
||||
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
|
||||
> config.getmaxWorkerIdleTimeMillisBeforeDeletion();
|
||||
> config.getMaxWorkerIdleTimeMillisBeforeDeletion();
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -197,8 +204,17 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
|
||||
if (terminated != null) {
|
||||
currentlyTerminating.addAll(terminated.getNodeIds());
|
||||
lastTerminateTime = new DateTime();
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
|
||||
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
|
||||
log.makeAlert(
|
||||
"It has been %d millis since last scheduled termination but nodes remain",
|
||||
durSinceLastTerminate.getMillis()
|
||||
).emit();
|
||||
}
|
||||
|
||||
log.info(
|
||||
"[%s] still terminating. Wait for all nodes to terminate before trying again.",
|
||||
currentlyTerminating
|
||||
|
@ -235,36 +251,38 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
return started;
|
||||
}
|
||||
|
||||
public int getNumWorkers()
|
||||
{
|
||||
return zkWorkers.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(Task task, TaskContext context, TaskCallback callback)
|
||||
{
|
||||
assignTask(
|
||||
new TaskWrapper(
|
||||
task, context, callback, retryPolicyFactory.makeRetryPolicy()
|
||||
)
|
||||
if (tasks.contains(task.getId())) {
|
||||
throw new ISE("Assigned a task[%s] that already exists, WTF is happening?!", task.getId());
|
||||
}
|
||||
TaskWrapper taskWrapper = new TaskWrapper(
|
||||
task, context, callback, retryPolicyFactory.makeRetryPolicy()
|
||||
);
|
||||
tasks.put(taskWrapper.getTask().getId(), taskWrapper);
|
||||
assignTask(taskWrapper);
|
||||
}
|
||||
|
||||
private void assignTask(TaskWrapper taskWrapper)
|
||||
{
|
||||
tasks.putIfAbsent(taskWrapper.getTask().getId(), taskWrapper);
|
||||
WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
|
||||
|
||||
// If the task already exists, we don't need to announce it
|
||||
if (workerWrapper != null) {
|
||||
final Worker worker = workerWrapper.getWorker();
|
||||
try {
|
||||
|
||||
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskWrapper.getTask().getId());
|
||||
|
||||
TaskStatus taskStatus = jsonMapper.readValue(
|
||||
workerWrapper.getStatusCache()
|
||||
.getCurrentData(
|
||||
JOINER.join(
|
||||
config.getStatusPath(),
|
||||
worker.getHost(),
|
||||
taskWrapper.getTask().getId()
|
||||
)
|
||||
JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())
|
||||
)
|
||||
.getData(),
|
||||
TaskStatus.class
|
||||
|
@ -282,8 +300,9 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
log.error(e, "Task exists, but hit exception!");
|
||||
retryTask(new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()), taskWrapper);
|
||||
}
|
||||
} else { // Announce the task
|
||||
workerWrapper = getWorkerForTask();
|
||||
} else {
|
||||
// Announce the task or retry if there is not enough capacity
|
||||
workerWrapper = findWorkerForTask();
|
||||
if (workerWrapper != null) {
|
||||
announceTask(workerWrapper.getWorker(), taskWrapper);
|
||||
} else {
|
||||
|
@ -330,7 +349,7 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
}
|
||||
}
|
||||
},
|
||||
retryPolicy.getAndIncrementRetryDelay(),
|
||||
retryPolicy.getAndIncrementRetryDelay().getMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
}
|
||||
|
@ -349,28 +368,10 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
|
||||
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
|
||||
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
|
||||
final ConcurrentSkipListSet<String> runningTasks = new ConcurrentSkipListSet<String>(
|
||||
Lists.transform(
|
||||
statusCache.getCurrentData(),
|
||||
new Function<ChildData, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable ChildData input)
|
||||
{
|
||||
try {
|
||||
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
final WorkerWrapper workerWrapper = new WorkerWrapper(
|
||||
worker,
|
||||
runningTasks,
|
||||
statusCache
|
||||
statusCache,
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
// Add status listener to the watcher for status changes
|
||||
|
@ -387,10 +388,7 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
TaskStatus taskStatus = jsonMapper.readValue(
|
||||
event.getData().getData(), TaskStatus.class
|
||||
);
|
||||
taskId = taskStatus.getId();
|
||||
|
||||
log.info("New status[%s] appeared!", taskId);
|
||||
runningTasks.add(taskId);
|
||||
log.info("New status[%s] appeared!", taskStatus.getId());
|
||||
statusLock.notify();
|
||||
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
|
||||
String statusPath = event.getData().getPath();
|
||||
|
@ -415,13 +413,13 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
callback.notify(taskStatus);
|
||||
}
|
||||
tasks.remove(taskId);
|
||||
runningTasks.remove(taskId);
|
||||
cf.delete().guaranteed().forPath(statusPath);
|
||||
cf.delete().guaranteed().inBackground().forPath(statusPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception in status listener");
|
||||
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
|
||||
}
|
||||
}
|
||||
|
@ -458,25 +456,27 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
|
||||
WorkerWrapper workerWrapper = zkWorkers.get(workerId);
|
||||
if (workerWrapper != null) {
|
||||
for (String taskId : workerWrapper.getRunningTasks()) {
|
||||
TaskWrapper taskWrapper = tasks.get(taskId);
|
||||
if (taskWrapper != null) {
|
||||
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
|
||||
}
|
||||
workerWrapper.removeTask(taskId);
|
||||
}
|
||||
|
||||
try {
|
||||
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
|
||||
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId)));
|
||||
|
||||
for (String taskId : tasksToRetry) {
|
||||
TaskWrapper taskWrapper = tasks.get(taskId);
|
||||
if (taskWrapper != null) {
|
||||
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
|
||||
}
|
||||
}
|
||||
|
||||
workerWrapper.getStatusCache().close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Failed to close watcher associated with worker[%s]", workerWrapper.getWorker().getHost());
|
||||
catch (Exception e) {
|
||||
log.error(e, "Failed to cleanly remove worker[%s]");
|
||||
}
|
||||
}
|
||||
zkWorkers.remove(workerId);
|
||||
}
|
||||
|
||||
private WorkerWrapper getWorkerForTask()
|
||||
private WorkerWrapper findWorkerForTask()
|
||||
{
|
||||
try {
|
||||
final MinMaxPriorityQueue<WorkerWrapper> workerQueue = MinMaxPriorityQueue.<WorkerWrapper>orderedBy(
|
||||
|
@ -509,8 +509,17 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
AutoScalingData provisioned = strategy.provision();
|
||||
if (provisioned != null) {
|
||||
currentlyProvisioning.addAll(provisioned.getNodeIds());
|
||||
lastProvisionTime = new DateTime();
|
||||
}
|
||||
} else {
|
||||
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
|
||||
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
|
||||
log.makeAlert(
|
||||
"It has been %d millis since last scheduled provision but nodes remain",
|
||||
durSinceLastProvision.getMillis()
|
||||
).emit();
|
||||
}
|
||||
|
||||
log.info(
|
||||
"[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
|
||||
currentlyProvisioning
|
||||
|
@ -554,8 +563,9 @@ public class RemoteTaskRunner implements TaskRunner
|
|||
jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
|
||||
);
|
||||
|
||||
// Syncing state with Zookeeper
|
||||
while (findWorkerRunningTask(taskWrapper) == null) {
|
||||
statusLock.wait();
|
||||
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator;
|
|||
|
||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -29,29 +30,25 @@ public class RetryPolicy
|
|||
private static final EmittingLogger log = new EmittingLogger(RetryPolicy.class);
|
||||
|
||||
private final long MAX_NUM_RETRIES;
|
||||
private final long MAX_RETRY_DELAY_MILLIS;
|
||||
private final Duration MAX_RETRY_DURATION;
|
||||
|
||||
private volatile long currRetryDelay;
|
||||
private volatile Duration currRetryDelay;
|
||||
private volatile int retryCount;
|
||||
|
||||
public RetryPolicy(RetryPolicyConfig config)
|
||||
{
|
||||
this.MAX_NUM_RETRIES = config.getMaxRetryCount();
|
||||
this.MAX_RETRY_DELAY_MILLIS = config.getRetryMaxMillis();
|
||||
this.MAX_RETRY_DURATION = config.getRetryMaxDuration();
|
||||
|
||||
this.currRetryDelay = config.getRetryMinMillis();
|
||||
this.currRetryDelay = config.getRetryMinDuration();
|
||||
this.retryCount = 0;
|
||||
}
|
||||
|
||||
public long getAndIncrementRetryDelay()
|
||||
public Duration getAndIncrementRetryDelay()
|
||||
{
|
||||
long retVal = currRetryDelay;
|
||||
if (currRetryDelay < MAX_RETRY_DELAY_MILLIS) {
|
||||
currRetryDelay *= 2;
|
||||
}
|
||||
|
||||
Duration retVal = new Duration(currRetryDelay);
|
||||
currRetryDelay = new Duration(Math.min(currRetryDelay.getMillis() * 2, MAX_RETRY_DURATION.getMillis()));
|
||||
retryCount++;
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,29 +19,49 @@
|
|||
|
||||
package com.metamx.druid.merger.coordinator;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.worker.Worker;
|
||||
import com.netflix.curator.framework.recipes.cache.ChildData;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WorkerWrapper
|
||||
public class WorkerWrapper implements Closeable
|
||||
{
|
||||
private final Worker worker;
|
||||
private final ConcurrentSkipListSet<String> runningTasks;
|
||||
private final PathChildrenCache statusCache;
|
||||
private final Function<ChildData, String> cacheConverter;
|
||||
|
||||
private volatile DateTime lastCompletedTaskTime = new DateTime();
|
||||
|
||||
public WorkerWrapper(Worker worker, ConcurrentSkipListSet<String> runningTasks, PathChildrenCache statusCache)
|
||||
public WorkerWrapper(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
|
||||
{
|
||||
this.worker = worker;
|
||||
this.runningTasks = runningTasks;
|
||||
this.statusCache = statusCache;
|
||||
this.cacheConverter = new Function<ChildData, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable ChildData input)
|
||||
{
|
||||
try {
|
||||
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public Worker getWorker()
|
||||
|
@ -51,7 +71,12 @@ public class WorkerWrapper
|
|||
|
||||
public Set<String> getRunningTasks()
|
||||
{
|
||||
return runningTasks;
|
||||
return Sets.newHashSet(
|
||||
Lists.transform(
|
||||
statusCache.getCurrentData(),
|
||||
cacheConverter
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public PathChildrenCache getStatusCache()
|
||||
|
@ -66,7 +91,7 @@ public class WorkerWrapper
|
|||
|
||||
public boolean isAtCapacity()
|
||||
{
|
||||
return runningTasks.size() >= worker.getCapacity();
|
||||
return statusCache.getCurrentData().size() >= worker.getCapacity();
|
||||
}
|
||||
|
||||
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
||||
|
@ -74,11 +99,7 @@ public class WorkerWrapper
|
|||
lastCompletedTaskTime = completedTaskTime;
|
||||
}
|
||||
|
||||
public void removeTask(String taskId)
|
||||
{
|
||||
runningTasks.remove(taskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
statusCache.close();
|
||||
|
|
|
@ -83,4 +83,8 @@ public abstract class IndexerCoordinatorConfig
|
|||
@Config("druid.merger.rowFlushBoundary")
|
||||
@Default("500000")
|
||||
public abstract long getRowFlushBoundary();
|
||||
|
||||
@Config("druid.indexer.strategy")
|
||||
@Default("noop")
|
||||
public abstract String getStrategyImpl();
|
||||
}
|
||||
|
|
|
@ -46,5 +46,13 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
|
|||
|
||||
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
|
||||
@Default("1")
|
||||
public abstract int getmaxWorkerIdleTimeMillisBeforeDeletion();
|
||||
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
|
||||
|
||||
@Config("druid.indexer.maxScalingDuration")
|
||||
@Default("PT1H")
|
||||
public abstract Duration getMaxScalingDuration();
|
||||
|
||||
@Config("druid.indexer.taskAssignmentTimeoutDuration")
|
||||
@Default("PT5M")
|
||||
public abstract Duration getTaskAssignmentTimeoutDuration();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package com.metamx.druid.merger.coordinator.config;
|
||||
|
||||
import org.joda.time.Duration;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
|
@ -27,12 +28,12 @@ import org.skife.config.Default;
|
|||
public abstract class RetryPolicyConfig
|
||||
{
|
||||
@Config("druid.indexer.retry.minWaitMillis")
|
||||
@Default("60000") // 1 minute
|
||||
public abstract long getRetryMinMillis();
|
||||
@Default("PT1M") // 1 minute
|
||||
public abstract Duration getRetryMinDuration();
|
||||
|
||||
@Config("druid.indexer.retry.maxWaitMillis")
|
||||
@Default("600000") // 10 minutes
|
||||
public abstract long getRetryMaxMillis();
|
||||
@Default("PT10M") // 10 minutes
|
||||
public abstract Duration getRetryMaxDuration();
|
||||
|
||||
@Config("druid.indexer.retry.maxRetryCount")
|
||||
@Default("10")
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.config.Config;
|
||||
|
@ -384,8 +385,8 @@ public class IndexerCoordinatorNode
|
|||
if (taskToolbox == null) {
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
props.getProperty("com.metamx.aws.accessKey"),
|
||||
props.getProperty("com.metamx.aws.secretKey")
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
final SegmentPusher segmentPusher = new S3SegmentPusher(
|
||||
|
@ -444,7 +445,7 @@ public class IndexerCoordinatorNode
|
|||
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
|
||||
taskStorage = new DbTaskStorage(jsonMapper, dbConnectorConfig, new DbConnector(dbConnectorConfig).getDBI());
|
||||
} else {
|
||||
throw new IllegalStateException(String.format("Invalid storage implementation: %s", config.getStorageImpl()));
|
||||
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -468,17 +469,22 @@ public class IndexerCoordinatorNode
|
|||
.build()
|
||||
);
|
||||
|
||||
ScalingStrategy strategy = new EC2AutoScalingStrategy(
|
||||
new AmazonEC2Client(
|
||||
new BasicAWSCredentials(
|
||||
props.getProperty("com.metamx.aws.accessKey"),
|
||||
props.getProperty("com.metamx.aws.secretKey")
|
||||
)
|
||||
),
|
||||
configFactory.build(EC2AutoScalingStrategyConfig.class)
|
||||
);
|
||||
// TODO: use real strategy before actual deployment
|
||||
strategy = new NoopScalingStrategy();
|
||||
ScalingStrategy strategy;
|
||||
if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
|
||||
strategy = new EC2AutoScalingStrategy(
|
||||
new AmazonEC2Client(
|
||||
new BasicAWSCredentials(
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
),
|
||||
configFactory.build(EC2AutoScalingStrategyConfig.class)
|
||||
);
|
||||
} else if (config.getStorageImpl().equalsIgnoreCase("noop")) {
|
||||
strategy = new NoopScalingStrategy();
|
||||
} else {
|
||||
throw new ISE("Invalid strategy implementation: %s",config.getStrategyImpl());
|
||||
}
|
||||
|
||||
return new RemoteTaskRunner(
|
||||
jsonMapper,
|
||||
|
@ -503,7 +509,7 @@ public class IndexerCoordinatorNode
|
|||
}
|
||||
};
|
||||
} else {
|
||||
throw new IllegalStateException(String.format("Invalid runner implementation: %s", config.getRunnerImpl()));
|
||||
throw new ISE("Invalid runner implementation: %s", config.getRunnerImpl());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,8 @@
|
|||
package com.metamx.druid.merger.coordinator.scaling;
|
||||
|
||||
import com.amazonaws.services.ec2.model.Instance;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.metamx.druid.merger.coordinator.WorkerWrapper;
|
||||
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This class just logs when scaling should occur.
|
||||
|
|
|
@ -47,7 +47,6 @@ public class TaskMonitor
|
|||
|
||||
private final PathChildrenCache pathChildrenCache;
|
||||
private final CuratorFramework cf;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final WorkerCuratorCoordinator workerCuratorCoordinator;
|
||||
private final TaskToolbox toolbox;
|
||||
private final ExecutorService exec;
|
||||
|
@ -55,7 +54,6 @@ public class TaskMonitor
|
|||
public TaskMonitor(
|
||||
PathChildrenCache pathChildrenCache,
|
||||
CuratorFramework cf,
|
||||
ObjectMapper jsonMapper,
|
||||
WorkerCuratorCoordinator workerCuratorCoordinator,
|
||||
TaskToolbox toolbox,
|
||||
ExecutorService exec
|
||||
|
@ -63,7 +61,6 @@ public class TaskMonitor
|
|||
{
|
||||
this.pathChildrenCache = pathChildrenCache;
|
||||
this.cf = cf;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.workerCuratorCoordinator = workerCuratorCoordinator;
|
||||
this.toolbox = toolbox;
|
||||
this.exec = exec;
|
||||
|
@ -87,7 +84,7 @@ public class TaskMonitor
|
|||
throws Exception
|
||||
{
|
||||
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
|
||||
final TaskHolder taskHolder = jsonMapper.readValue(
|
||||
final TaskHolder taskHolder = toolbox.getObjectMapper().readValue(
|
||||
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
|
||||
TaskHolder.class
|
||||
);
|
||||
|
|
|
@ -287,8 +287,8 @@ public class WorkerNode
|
|||
if (taskToolbox == null) {
|
||||
final RestS3Service s3Client = new RestS3Service(
|
||||
new AWSCredentials(
|
||||
props.getProperty("com.metamx.aws.accessKey"),
|
||||
props.getProperty("com.metamx.aws.secretKey")
|
||||
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
|
||||
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
|
||||
)
|
||||
);
|
||||
final SegmentPusher segmentPusher = new S3SegmentPusher(
|
||||
|
@ -334,7 +334,6 @@ public class WorkerNode
|
|||
taskMonitor = new TaskMonitor(
|
||||
pathChildrenCache,
|
||||
curatorFramework,
|
||||
jsonMapper,
|
||||
workerCuratorCoordinator,
|
||||
taskToolbox,
|
||||
workerExec
|
||||
|
|
|
@ -0,0 +1,440 @@
|
|||
package com.metamx.druid.merger.coordinator;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||
import com.metamx.druid.merger.common.task.DefaultMergeTask;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
|
||||
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
|
||||
import com.metamx.druid.merger.worker.TaskMonitor;
|
||||
import com.metamx.druid.merger.worker.Worker;
|
||||
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
|
||||
import com.netflix.curator.framework.CuratorFramework;
|
||||
import com.netflix.curator.framework.CuratorFrameworkFactory;
|
||||
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
|
||||
import com.netflix.curator.retry.ExponentialBackoffRetry;
|
||||
import com.netflix.curator.test.TestingCluster;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.codehaus.jackson.annotate.JsonTypeName;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.jsontype.NamedType;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RemoteTaskRunnerTest
|
||||
{
|
||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
private static final String basePath = "/test/druid/indexer";
|
||||
private static final String announcementsPath = String.format("%s/announcements", basePath);
|
||||
private static final String tasksPath = String.format("%s/tasks", basePath);
|
||||
private static final String statusPath = String.format("%s/status", basePath);
|
||||
|
||||
private TestingCluster testingCluster;
|
||||
private CuratorFramework cf;
|
||||
private PathChildrenCache pathChildrenCache;
|
||||
private RemoteTaskRunner remoteTaskRunner;
|
||||
private TaskMonitor taskMonitor;
|
||||
|
||||
private ScheduledExecutorService scheduledExec;
|
||||
|
||||
private Task task1;
|
||||
|
||||
private Worker worker1;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
testingCluster = new TestingCluster(1);
|
||||
testingCluster.start();
|
||||
|
||||
cf = CuratorFrameworkFactory.builder()
|
||||
.connectString(testingCluster.getConnectString())
|
||||
.retryPolicy(new ExponentialBackoffRetry(1, 10))
|
||||
.build();
|
||||
cf.start();
|
||||
|
||||
cf.create().creatingParentsIfNeeded().forPath(announcementsPath);
|
||||
cf.create().forPath(tasksPath);
|
||||
cf.create().forPath(String.format("%s/worker1", tasksPath));
|
||||
cf.create().forPath(statusPath);
|
||||
cf.create().forPath(String.format("%s/worker1", statusPath));
|
||||
|
||||
pathChildrenCache = new PathChildrenCache(cf, announcementsPath, true);
|
||||
|
||||
worker1 = new Worker(
|
||||
"worker1",
|
||||
"localhost",
|
||||
3,
|
||||
"0"
|
||||
);
|
||||
|
||||
makeRemoteTaskRunner();
|
||||
makeTaskMonitor();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
testingCluster.stop();
|
||||
remoteTaskRunner.stop();
|
||||
taskMonitor.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunNoExistingTask() throws Exception
|
||||
{
|
||||
remoteTaskRunner.run(
|
||||
task1,
|
||||
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()),
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithExistingCompletedTask() throws Exception
|
||||
{
|
||||
cf.create().creatingParentsIfNeeded().forPath(
|
||||
String.format("%s/worker1/task1", statusPath),
|
||||
jsonMapper.writeValueAsBytes(
|
||||
TaskStatus.success(
|
||||
"task1",
|
||||
Lists.<DataSegment>newArrayList()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// Really don't like this way of waiting for the task to appear
|
||||
while (remoteTaskRunner.getNumWorkers() == 0) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
final MutableBoolean callbackCalled = new MutableBoolean(false);
|
||||
remoteTaskRunner.run(
|
||||
task1,
|
||||
null,
|
||||
new TaskCallback()
|
||||
{
|
||||
@Override
|
||||
public void notify(TaskStatus status)
|
||||
{
|
||||
callbackCalled.setValue(true);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue());
|
||||
}
|
||||
|
||||
private void makeTaskMonitor() throws Exception
|
||||
{
|
||||
WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator(
|
||||
jsonMapper,
|
||||
new IndexerZkConfig()
|
||||
{
|
||||
@Override
|
||||
public String getAnnouncementPath()
|
||||
{
|
||||
return announcementsPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskPath()
|
||||
{
|
||||
return tasksPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatusPath()
|
||||
{
|
||||
return statusPath;
|
||||
}
|
||||
},
|
||||
cf,
|
||||
worker1
|
||||
);
|
||||
workerCuratorCoordinator.start();
|
||||
|
||||
taskMonitor = new TaskMonitor(
|
||||
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
|
||||
cf,
|
||||
workerCuratorCoordinator,
|
||||
new TaskToolbox(
|
||||
new IndexerCoordinatorConfig()
|
||||
{
|
||||
@Override
|
||||
public String getServerName()
|
||||
{
|
||||
return "worker1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLeaderLatchPath()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumLocalThreads()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRunnerImpl()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStorageImpl()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getBaseTaskDir()
|
||||
{
|
||||
try {
|
||||
return File.createTempFile("billy", "yay");
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWhitelistEnabled()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWhitelistDatasourcesString()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRowFlushBoundary()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getStrategyImpl()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}, null, null, null, jsonMapper
|
||||
),
|
||||
Executors.newSingleThreadExecutor()
|
||||
);
|
||||
jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test"));
|
||||
taskMonitor.start();
|
||||
}
|
||||
|
||||
private void makeRemoteTaskRunner() throws Exception
|
||||
{
|
||||
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
|
||||
|
||||
remoteTaskRunner = new RemoteTaskRunner(
|
||||
jsonMapper,
|
||||
new TestRemoteTaskRunnerConfig(),
|
||||
cf,
|
||||
pathChildrenCache,
|
||||
scheduledExec,
|
||||
new RetryPolicyFactory(new TestRetryPolicyConfig()),
|
||||
new TestScalingStrategy()
|
||||
);
|
||||
|
||||
task1 = new TestTask(
|
||||
"task1",
|
||||
"dummyDs",
|
||||
Lists.<DataSegment>newArrayList(
|
||||
new DataSegment(
|
||||
"dummyDs",
|
||||
new Interval(new DateTime(), new DateTime()),
|
||||
new DateTime().toString(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
0
|
||||
)
|
||||
), Lists.<AggregatorFactory>newArrayList()
|
||||
);
|
||||
|
||||
// Create a single worker and wait for things for be ready
|
||||
remoteTaskRunner.start();
|
||||
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
|
||||
String.format("%s/worker1", announcementsPath),
|
||||
jsonMapper.writeValueAsBytes(worker1)
|
||||
);
|
||||
while (remoteTaskRunner.getNumWorkers() == 0) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestRetryPolicyConfig extends RetryPolicyConfig
|
||||
{
|
||||
@Override
|
||||
public Duration getRetryMinDuration()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getRetryMaxDuration()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxRetryCount()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestScalingStrategy<T> implements ScalingStrategy<T>
|
||||
{
|
||||
@Override
|
||||
public AutoScalingData provision()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoScalingData terminate(List<String> nodeIds)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
|
||||
{
|
||||
@Override
|
||||
public Duration getTerminateResourcesDuration()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getTerminateResourcesOriginDateTime()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMinWorkerVersion()
|
||||
{
|
||||
return "0";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMinNumWorkers()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getMaxScalingDuration()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAnnouncementPath()
|
||||
{
|
||||
return announcementsPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskPath()
|
||||
{
|
||||
return tasksPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatusPath()
|
||||
{
|
||||
return statusPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getTaskAssignmentTimeoutDuration()
|
||||
{
|
||||
return new Duration(60000);
|
||||
}
|
||||
}
|
||||
|
||||
@JsonTypeName("test")
|
||||
private static class TestTask extends DefaultMergeTask
|
||||
{
|
||||
private final String id;
|
||||
|
||||
public TestTask(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("segments") List<DataSegment> segments,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
|
||||
)
|
||||
{
|
||||
super(dataSource, segments, aggregators);
|
||||
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getId()
|
||||
{
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType()
|
||||
{
|
||||
return Type.TEST;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
return TaskStatus.success("task1", Lists.<DataSegment>newArrayList());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.metamx.druid.merger.coordinator;
|
||||
|
||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||
import junit.framework.Assert;
|
||||
import org.joda.time.Duration;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RetryPolicyTest
|
||||
{
|
||||
@Test
|
||||
public void testGetAndIncrementRetryDelay() throws Exception
|
||||
{
|
||||
RetryPolicy retryPolicy = new RetryPolicy(
|
||||
new RetryPolicyConfig()
|
||||
{
|
||||
@Override
|
||||
public Duration getRetryMinDuration()
|
||||
{
|
||||
return new Duration("PT1S");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getRetryMaxDuration()
|
||||
{
|
||||
return new Duration("PT10S");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxRetryCount()
|
||||
{
|
||||
return 10;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Assert.assertEquals(new Duration("PT1S"), retryPolicy.getAndIncrementRetryDelay());
|
||||
Assert.assertEquals(new Duration("PT2S"), retryPolicy.getAndIncrementRetryDelay());
|
||||
Assert.assertEquals(new Duration("PT4S"), retryPolicy.getAndIncrementRetryDelay());
|
||||
Assert.assertEquals(new Duration("PT8S"), retryPolicy.getAndIncrementRetryDelay());
|
||||
Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay());
|
||||
Assert.assertEquals(new Duration("PT10S"), retryPolicy.getAndIncrementRetryDelay());
|
||||
}
|
||||
}
|
|
@ -27,12 +27,8 @@ import com.amazonaws.services.ec2.model.Reservation;
|
|||
import com.amazonaws.services.ec2.model.RunInstancesRequest;
|
||||
import com.amazonaws.services.ec2.model.RunInstancesResult;
|
||||
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.druid.merger.coordinator.WorkerWrapper;
|
||||
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
|
||||
import com.metamx.druid.merger.worker.Worker;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -40,8 +36,6 @@ import org.junit.Test;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -56,7 +50,6 @@ public class EC2AutoScalingStrategyTest
|
|||
private DescribeInstancesResult describeInstancesResult;
|
||||
private Reservation reservation;
|
||||
private Instance instance;
|
||||
private WorkerWrapper worker;
|
||||
private EC2AutoScalingStrategy strategy;
|
||||
|
||||
@Before
|
||||
|
@ -73,12 +66,6 @@ public class EC2AutoScalingStrategyTest
|
|||
.withImageId(AMI_ID)
|
||||
.withPrivateIpAddress(IP);
|
||||
|
||||
worker = new WorkerWrapper(
|
||||
new Worker("dummyHost", IP, 2, "0"),
|
||||
new ConcurrentSkipListSet<String>(),
|
||||
null
|
||||
);
|
||||
worker.setLastCompletedTaskTime(new DateTime(0));
|
||||
strategy = new EC2AutoScalingStrategy(
|
||||
amazonEC2Client, new EC2AutoScalingStrategyConfig()
|
||||
{
|
||||
|
@ -145,23 +132,12 @@ public class EC2AutoScalingStrategyTest
|
|||
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
|
||||
EasyMock.replay(reservation);
|
||||
|
||||
worker.getRunningTasks().add("task1");
|
||||
|
||||
Assert.assertFalse(worker.isAtCapacity());
|
||||
|
||||
worker.getRunningTasks().add("task2");
|
||||
|
||||
Assert.assertTrue(worker.isAtCapacity());
|
||||
|
||||
AutoScalingData created = strategy.provision();
|
||||
|
||||
Assert.assertEquals(created.getNodeIds().size(), 1);
|
||||
Assert.assertEquals(created.getNodes().size(), 1);
|
||||
Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0));
|
||||
|
||||
worker.getRunningTasks().remove("task1");
|
||||
worker.getRunningTasks().remove("task2");
|
||||
|
||||
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost"));
|
||||
|
||||
Assert.assertEquals(deleted.getNodeIds().size(), 1);
|
||||
|
|
8
pom.xml
8
pom.xml
|
@ -19,7 +19,7 @@
|
|||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
|
@ -50,6 +50,7 @@
|
|||
<module>merger</module>
|
||||
<module>realtime</module>
|
||||
<module>examples</module>
|
||||
<module>druid-services</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -131,6 +132,11 @@
|
|||
<artifactId>curator-x-discovery</artifactId>
|
||||
<version>1.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.netflix.curator</groupId>
|
||||
<artifactId>curator-test</artifactId>
|
||||
<version>1.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>it.uniroma3.mat</groupId>
|
||||
<artifactId>extendedset</artifactId>
|
||||
|
|
|
@ -0,0 +1,412 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.utils;
|
||||
|
||||
import com.google.common.io.Closeables;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.zk.PropertiesZkSerializer;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.I0Itec.zkclient.ZkConnection;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Set up the shared Druid ensemble space.
|
||||
* This affects the Zookeeper which holds common properties, and znode paths for coordination,
|
||||
* and also performs metadata table creation in the database (MySQL).
|
||||
* By storing ensemble-wide properties in zookeeper, cluster administration is simplified.
|
||||
* Each service instance can also have local property overrides in the file runtime.properties
|
||||
* located in the classpath.
|
||||
* <p>
|
||||
* The design rules are noted here with rationale
|
||||
* </p>
|
||||
* <p/>
|
||||
* <pre>
|
||||
* Design Rule Notes:
|
||||
* (a) Properties set on the commandline of services take precedence over runtime.properties which
|
||||
* takes precedence over properties stored in zookeeper.
|
||||
*
|
||||
* Rationale: organizing principle.
|
||||
*
|
||||
* (a) Services load properties on startup only.
|
||||
*
|
||||
* Rationale: stepwise changes are safer and easier to manage.
|
||||
*
|
||||
* (b) Only DruidSetup creates properties and znode paths (zpaths) on zookeeper and no other tool or service
|
||||
* will make ensemble-wide settings automatically.
|
||||
*
|
||||
* Rationale: one place for this logic, under manual control, and avoid accidental
|
||||
* namespace/partition creation.
|
||||
*
|
||||
* (c) DruidSetup creates reasonable zpaths but supports overrides to enable tactical
|
||||
* version transitions (just in case). If zpaths are overridden, then they must all be
|
||||
* overridden together since they are not independent.
|
||||
*
|
||||
* Rationale: convention beats configuration most of the time; sometimes configuration is needed
|
||||
* negotiate unusual cases.
|
||||
*
|
||||
* (d) Properties settings stored on zookeeper are not cumulative; previous properties are removed before
|
||||
* new ones are stored.
|
||||
* Rationale: Keep the operations at the granularity of a file of properties, avoid
|
||||
* dependence on order of setup operations, enable dumping of current settings.
|
||||
* </pre>
|
||||
*
|
||||
* @author pbaclace
|
||||
*/
|
||||
public class DruidSetup
|
||||
{
|
||||
private static final Logger log = new Logger(DruidSetup.class);
|
||||
|
||||
public static void main(final String[] args)
|
||||
{
|
||||
ZkClient zkClient = null;
|
||||
|
||||
if (args.length < 2 || args.length > 3) {
|
||||
printUsage();
|
||||
System.exit(1);
|
||||
}
|
||||
String cmd = args[0];
|
||||
if ("dump".equals(cmd) && args.length == 3) {
|
||||
final String zkConnect = args[1];
|
||||
zkClient = connectToZK(zkConnect);
|
||||
String zpathBase = args[2];
|
||||
dumpFromZk(zkClient, zpathBase, zkConnect, System.out);
|
||||
} else if ("put".equals(cmd) && args.length == 3) {
|
||||
final String zkConnect = args[1];
|
||||
zkClient = connectToZK(zkConnect);
|
||||
final String pfile = args[2];
|
||||
putToZk(zkClient, pfile);
|
||||
} else if ("dbprep".equals(cmd) && args.length == 2) {
|
||||
final String pfile = args[1];
|
||||
prepDB(pfile);
|
||||
} else {
|
||||
printUsage();
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
if (zkClient != null) {
|
||||
zkClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load properties from local file, validate and tweak.
|
||||
* <p/>
|
||||
* This can only be used for setup, not service run time because of some assembly here.
|
||||
*
|
||||
* @param pfile path to runtime.properties file to be read.
|
||||
* @param props Properties object to fill, props like druid.zk.paths.*Path will always be set after
|
||||
* this method either because the input file has them set (overrides) or because prop
|
||||
* druid.zk.paths.base was used as a prefix to construct the default zpaths;
|
||||
* druid.zk.paths.base will be set iff there is a single base for all zpaths
|
||||
*/
|
||||
private static void loadProperties(String pfile, Properties props)
|
||||
{
|
||||
InputStream is = null;
|
||||
try {
|
||||
is = new FileInputStream(pfile);
|
||||
} catch (FileNotFoundException e) {
|
||||
System.err.println("File not found: " + pfile);
|
||||
System.err.println("No changes made.");
|
||||
System.exit(4);
|
||||
} catch (IOException ioe) {
|
||||
reportErrorAndExit(pfile, ioe);
|
||||
}
|
||||
try {
|
||||
props.load(is);
|
||||
} catch (IOException e) {
|
||||
reportErrorAndExit(pfile, e);
|
||||
} finally {
|
||||
Closeables.closeQuietly(is);
|
||||
}
|
||||
|
||||
if (! Initialization.validateResolveProps(props)) { // bail, errors have been emitted
|
||||
System.exit(9);
|
||||
}
|
||||
|
||||
// emit effective zpaths to be used
|
||||
System.out.println("Effective zpath properties:");
|
||||
for (String pname : Initialization.SUB_PATH_PROPS) {
|
||||
System.out.println(" " + pname + "=" + props.getProperty(pname));
|
||||
}
|
||||
System.out.println(" " + "druid.zk.paths.propertiesPath" + "=" +
|
||||
props.getProperty("druid.zk.paths.propertiesPath"));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param zkClient zookeeper client.
|
||||
* @param zpathBase znode base path.
|
||||
* @param zkConnect ZK coordinates in the form host1:port1[,host2:port2[, ...]]
|
||||
* @param out
|
||||
*/
|
||||
private static void dumpFromZk(ZkClient zkClient, String zpathBase, String zkConnect, PrintStream out)
|
||||
{
|
||||
final String propPath = Initialization.makePropPath(zpathBase);
|
||||
if (zkClient.exists(propPath)) {
|
||||
Properties currProps = zkClient.readData(propPath, true);
|
||||
if (currProps != null) {
|
||||
out.println("# Begin Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath);
|
||||
try {
|
||||
currProps.store(out, "Druid");
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
out.println("# End Properties Listing for zkConnect=" + zkConnect + " zpath=" + propPath);
|
||||
out.println("# NOTE: properties like druid.zk.paths.*Path are always stored in zookeeper in absolute form.");
|
||||
out.println();
|
||||
}
|
||||
}
|
||||
//out.println("Zookeeper znodes and zpaths for " + zkConnect + " (showing all zpaths)");
|
||||
// list all znodes
|
||||
// (not ideal since recursive listing starts at / instead of at baseZkPath)
|
||||
//zkClient.showFolders(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param zkClient zookeeper client.
|
||||
* @param pfile
|
||||
*/
|
||||
private static void putToZk(ZkClient zkClient, String pfile)
|
||||
{
|
||||
Properties props = new Properties();
|
||||
loadProperties(pfile, props);
|
||||
String zpathBase = props.getProperty("druid.zk.paths.base");
|
||||
|
||||
// create znodes first
|
||||
//
|
||||
createZNodes(zkClient, zpathBase, System.out);
|
||||
|
||||
// put props
|
||||
//
|
||||
updatePropertiesZK(zkClient, zpathBase, props, System.out);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param zkClient zookeeper client.
|
||||
* @param zpathBase znode base path.
|
||||
* @param props the properties to store.
|
||||
* @param out the PrintStream for human readable update summary (usually System.out).
|
||||
*/
|
||||
private static void updatePropertiesZK(ZkClient zkClient, String zpathBase, Properties props, PrintStream out)
|
||||
{
|
||||
final String propPathOverride = props.getProperty("druid.zk.paths.propertiesPath");
|
||||
final String propPathConstructed = Initialization.makePropPath(zpathBase);
|
||||
final String propPath = (propPathOverride != null) ? propPathOverride : propPathConstructed;
|
||||
Properties currProps = null;
|
||||
if (zkClient.exists(propPath)) {
|
||||
currProps = zkClient.readData(propPath, true);
|
||||
}
|
||||
boolean propsDiffer = false;
|
||||
if (currProps == null) {
|
||||
out.println("No properties currently stored in zk");
|
||||
propsDiffer = true;
|
||||
} else { // determine whether anything is different
|
||||
int countNew = 0;
|
||||
int countDiffer = 0;
|
||||
int countRemoved = 0;
|
||||
int countNoChange = 0;
|
||||
String currMetaPropVal = "";
|
||||
StringBuilder changes = new StringBuilder(1024);
|
||||
for (String pname : props.stringPropertyNames()) {
|
||||
if (pname.equals(PropertiesZkSerializer.META_PROP)) continue; // ignore meta prop datestamp, if any
|
||||
final String pvalue = props.getProperty(pname);
|
||||
final String pvalueCurr = currProps.getProperty(pname);
|
||||
if (pvalueCurr == null) {
|
||||
countNew++;
|
||||
} else {
|
||||
if (pvalueCurr.equals(pvalue)) {
|
||||
countNoChange++;
|
||||
} else {
|
||||
countDiffer++;
|
||||
changes.append("CHANGED: ").append(pname).append("= PREV=").append(pvalueCurr)
|
||||
.append(" NOW=").append(pvalue).append("\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
for (String pname : currProps.stringPropertyNames()) {
|
||||
if (pname.equals(PropertiesZkSerializer.META_PROP)) {
|
||||
currMetaPropVal = currProps.getProperty(pname);
|
||||
continue; // ignore meta prop datestamp
|
||||
}
|
||||
if (props.getProperty(pname) == null) {
|
||||
countRemoved++;
|
||||
changes.append("REMOVED: ").append(pname).append("=").append(currProps.getProperty(pname)).append("\n");
|
||||
}
|
||||
}
|
||||
if (countNew + countRemoved + countDiffer > 0) {
|
||||
out.println("Current properties differ: "
|
||||
+ countNew + " new, "
|
||||
+ countDiffer + " different values, "
|
||||
+ countRemoved + " removed, "
|
||||
+ countNoChange + " unchanged, "
|
||||
+ currMetaPropVal + " previously updated"
|
||||
);
|
||||
out.println(changes);
|
||||
propsDiffer = true;
|
||||
} else {
|
||||
out.println("Current properties identical to file given, entry count=" + countNoChange);
|
||||
}
|
||||
}
|
||||
if (propsDiffer) {
|
||||
if (currProps != null) {
|
||||
zkClient.delete(propPath);
|
||||
}
|
||||
// update zookeeper
|
||||
zkClient.createPersistent(propPath, props);
|
||||
out.println("Properties updated, entry count=" + props.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param zkClient zookeeper client.
|
||||
* @param zpathBase znode base path.
|
||||
* @param out the PrintStream for human readable update summary.
|
||||
*/
|
||||
private static void createZNodes(ZkClient zkClient, String zpathBase, PrintStream out)
|
||||
{
|
||||
zkClient.createPersistent(zpathBase, true);
|
||||
for (String subPath : Initialization.SUB_PATHS) {
|
||||
final String thePath = String.format("%s/%s", zpathBase, subPath);
|
||||
if (zkClient.exists(thePath)) {
|
||||
out.printf("Path[%s] exists already%n", thePath);
|
||||
} else {
|
||||
out.printf("Creating ZK path[%s]%n", thePath);
|
||||
zkClient.createPersistent(thePath, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void reportErrorAndExit(String pfile, IOException ioe)
|
||||
{
|
||||
System.err.println("Could not read file: " + pfile);
|
||||
System.err.println(" because of: " + ioe);
|
||||
System.err.println("No changes made.");
|
||||
System.exit(4);
|
||||
}
|
||||
|
||||
private static ZkClient connectToZK(String zkConnect)
|
||||
{
|
||||
return new ZkClient(
|
||||
new ZkConnection(zkConnect),
|
||||
Integer.MAX_VALUE,
|
||||
new PropertiesZkSerializer()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to db and create table, if it does not exist.
|
||||
* NOTE: Connection failure only shows up in log output.
|
||||
*
|
||||
* @param pfile path to properties file to use.
|
||||
*/
|
||||
private static void prepDB(final String pfile)
|
||||
{
|
||||
Properties tmp_props = new Properties();
|
||||
loadProperties(pfile, tmp_props);
|
||||
final String tableName = tmp_props.getProperty("druid.database.segmentTable", "prod_segments");
|
||||
|
||||
final String dbConnectionUrl = tmp_props.getProperty("druid.database.connectURI");
|
||||
final String username = tmp_props.getProperty("druid.database.user");
|
||||
final String password = tmp_props.getProperty("druid.database.password");
|
||||
|
||||
//
|
||||
// validation
|
||||
//
|
||||
if (tableName.length() == 0 || !Character.isLetter(tableName.charAt(0))) {
|
||||
throw new RuntimeException("poorly formed property druid.database.segmentTable=" + tableName);
|
||||
}
|
||||
if (username == null || username.length() == 0) {
|
||||
throw new RuntimeException("poorly formed property druid.database.user=" + username);
|
||||
}
|
||||
if (password == null || password.length() == 0) {
|
||||
throw new RuntimeException("poorly formed property druid.database.password=" + password);
|
||||
}
|
||||
if (dbConnectionUrl == null || dbConnectionUrl.length() == 0) {
|
||||
throw new RuntimeException("poorly formed property druid.database.connectURI=" + dbConnectionUrl);
|
||||
}
|
||||
|
||||
final DbConnectorConfig config = new DbConnectorConfig()
|
||||
{
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDatabaseConnectURI()
|
||||
{
|
||||
return dbConnectionUrl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDatabaseUser()
|
||||
{
|
||||
return username;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDatabasePassword()
|
||||
{
|
||||
return password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSegmentTable()
|
||||
{
|
||||
return tableName;
|
||||
}
|
||||
};
|
||||
|
||||
DbConnector dbConnector = new DbConnector(config);
|
||||
|
||||
DbConnector.createSegmentTable(dbConnector.getDBI(), config);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Print usage to stdout.
|
||||
*/
|
||||
private static void printUsage()
|
||||
{
|
||||
System.out.println("Usage: <java invocation> CMD [args]\n"
|
||||
+ " Where CMD is a particular command:\n"
|
||||
+ " CMD choices:\n"
|
||||
+ " dump zkConnect baseZkPath # dump info from zk at given coordinates\n"
|
||||
+ " dbprep propfile # create metadata table in db\n"
|
||||
+ " put zkConnect propfile # store paths and propfile into zk at given coordinates\n"
|
||||
+ " args:\n"
|
||||
+ " zkConnect: ZK coordinates in the form host1:port1[,host2:port2[, ...]]\n"
|
||||
+ " baseZkPath: like /druid or /mydruid etc. to uniquely identify a Druid ensemble\n"
|
||||
+ " and should be equal to property druid.zk.paths.base\n"
|
||||
+ " propfile: Java properties file with common properties for all services in ensemble\n"
|
||||
+ " Notes:\n"
|
||||
+ " dump command makes no modifications and shows zk properties at baseZkPath.\n"
|
||||
+ " put command can safely be invoked more than once, will not disturb existing queues,\n"
|
||||
+ " and properties are not cumulative.\n"
|
||||
+ " A zookeeper can service more than one Druid ensemble if baseZkPath is distinct.\n"
|
||||
+ " Druid services only load properties during process startup.\n"
|
||||
+ " Properties defined on a service command line take precedence over the runtime.properties\n"
|
||||
+ " file which takes precedence over properties stored in zookeeper.\n"
|
||||
+ ""
|
||||
);
|
||||
}
|
||||
}
|
|
@ -21,11 +21,13 @@ package com.metamx.druid.utils;
|
|||
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.initialization.Initialization;
|
||||
import com.metamx.druid.zk.StringZkSerializer;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.I0Itec.zkclient.ZkConnection;
|
||||
|
||||
/**
|
||||
* @deprecated see DruidSetup
|
||||
*/
|
||||
public class ZkSetup
|
||||
{
|
||||
|
@ -33,13 +35,12 @@ public class ZkSetup
|
|||
{
|
||||
if (args.length != 5) {
|
||||
System.out.println("Usage: <java invocation> zkConnect baseZkPath dbConnectionUrl dbUsername:password tableName");
|
||||
System.out.println("This utility is deprecated, see DruidSetup instead.");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
String path = args[1];
|
||||
|
||||
String[] subPaths = new String[]{"announcements", "servedSegments", "loadQueue", "master"};
|
||||
|
||||
final ZkClient zkClient = new ZkClient(
|
||||
new ZkConnection(args[0]),
|
||||
Integer.MAX_VALUE,
|
||||
|
@ -47,7 +48,7 @@ public class ZkSetup
|
|||
);
|
||||
|
||||
zkClient.createPersistent(path, true);
|
||||
for (String subPath : subPaths) {
|
||||
for (String subPath : Initialization.SUB_PATHS) {
|
||||
final String thePath = String.format("%s/%s", path, subPath);
|
||||
if (zkClient.exists(thePath)) {
|
||||
System.out.printf("Path[%s] exists already%n", thePath);
|
||||
|
|
Loading…
Reference in New Issue