no param for scaling provision and minor fix for initialization

This commit is contained in:
Fangjin Yang 2012-11-08 10:08:40 -08:00
parent cdd9cdb2bd
commit 7083821fbf
6 changed files with 54 additions and 36 deletions

View File

@ -69,7 +69,8 @@ public class Initialization
"druid.zk.paths.announcementsPath",
"druid.zk.paths.servedSegmentsPath",
"druid.zk.paths.loadQueuePath",
"druid.zk.paths.masterPath"};
"druid.zk.paths.masterPath"
};
public static final String DEFAULT_ZPATH = "/druid";
public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle)
@ -119,10 +120,12 @@ public class Initialization
}
/** Load properties.
/**
* Load properties.
* Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper.
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host=none then do not load properties from zookeeper.
*
* @return Properties ready to use.
*/
public synchronized static Properties loadProperties()
@ -139,7 +142,9 @@ public class Initialization
final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties");
if (stream == null) {
log.info("runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now.");
log.info(
"runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."
);
} else {
log.info("Loading properties from runtime.properties");
try {
@ -202,7 +207,7 @@ public class Initialization
log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination.");
}
// validate properties now that all levels of precedence are loaded
if (! validateResolveProps(tmp_props)) {
if (!validateResolveProps(tmp_props)) {
log.error("Properties failed to validate, cannot continue");
throw new RuntimeException("Properties failed to validate");
}
@ -358,12 +363,15 @@ public class Initialization
return String.format("%s/%s", basePath, PROP_SUBPATH);
}
/** Validate and Resolve Properties.
/**
* Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.
* Check validity so that if druid.zk.paths.*Path props are set, all are set,
* if none set, then construct defaults relative to druid.zk.paths.base and add these
* to the properties chain.
*
* @param props
*
* @return true if valid zpath properties.
*/
public static boolean validateResolveProps(Properties props)
@ -379,7 +387,9 @@ public class Initialization
final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath");
if (!zpathValidateFailed) System.out.println("Effective zpath prefix=" + zpathEffective);
if (!zpathValidateFailed) {
System.out.println("Effective zpath prefix=" + zpathEffective);
}
// validate druid.zk.paths.*Path properties
//
@ -408,22 +418,25 @@ public class Initialization
}
}
if (zpathOverridesNotAbs) {
System.err.println("When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
"the znode path must start with '/' (slash) ; problem overrides:");
System.err.println(
"When overriding zk zpaths, with properties like druid.zk.paths.*Path " +
"the znode path must start with '/' (slash) ; problem overrides:"
);
System.err.print(sbErrors.toString());
}
if (zpathOverrideCount > 0) {
if (zpathOverrideCount < SUB_PATH_PROPS.length + 1) {
if (zpathOverrideCount < SUB_PATH_PROPS.length) {
zpathValidateFailed = true;
System.err.println("When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
"all must be overridden together; missing overrides:");
System.err.println(
"When overriding zk zpaths, with properties of form druid.zk.paths.*Path " +
"all must be overridden together; missing overrides:"
);
for (int i = 0; i < SUB_PATH_PROPS.length; i++) {
String val = props.getProperty(SUB_PATH_PROPS[i]);
if (val == null) {
System.err.println(" " + SUB_PATH_PROPS[i]);
}
}
if (propertiesZpathOverride == null) System.err.println(" " + "druid.zk.paths.propertiesPath");
} else { // proper overrides
// do not prefix with property druid.zk.paths.base
; // fallthru
@ -440,13 +453,16 @@ public class Initialization
}
props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties");
}
return ! zpathValidateFailed;
return !zpathValidateFailed;
}
/** Check znode zpath base for proper slash, no trailing slash.
/**
* Check znode zpath base for proper slash, no trailing slash.
*
* @param zpathBase znode base path, if null then this method does nothing.
* @param errorMsgPrefix error context to use if errors are emitted, should indicate
* where the zpathBase value came from.
*
* @return true if validate failed.
*/
public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix)

View File

@ -42,7 +42,6 @@ import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
@ -53,7 +52,6 @@ import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@ -373,19 +371,7 @@ public class RemoteTaskRunner implements TaskRunner
final WorkerWrapper workerWrapper = new WorkerWrapper(
worker,
statusCache,
new Function<ChildData, String>()
{
@Override
public String apply(@Nullable ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
jsonMapper
);
// Add status listener to the watcher for status changes
@ -520,7 +506,7 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Worker nodes do not have capacity to run any more tasks!");
if (currentlyProvisioning.isEmpty()) {
AutoScalingData provisioned = strategy.provision(currentlyProvisioning.size());
AutoScalingData provisioned = strategy.provision();
if (provisioned != null) {
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();

View File

@ -20,13 +20,17 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.worker.Worker;
import com.netflix.curator.framework.recipes.cache.ChildData;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
@ -41,11 +45,23 @@ public class WorkerWrapper implements Closeable
private volatile DateTime lastCompletedTaskTime = new DateTime();
public WorkerWrapper(Worker worker, PathChildrenCache statusCache, Function<ChildData, String> cacheConverter)
public WorkerWrapper(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
this.worker = worker;
this.statusCache = statusCache;
this.cacheConverter = cacheConverter;
this.cacheConverter = new Function<ChildData, String>()
{
@Override
public String apply(@Nullable ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
}
public Worker getWorker()

View File

@ -56,7 +56,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
}
@Override
public AutoScalingData<Instance> provision(long numUnassignedTasks)
public AutoScalingData<Instance> provision()
{
try {
log.info("Creating new instance(s)...");

View File

@ -12,7 +12,7 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class);
@Override
public AutoScalingData<String> provision(long numUnassignedTasks)
public AutoScalingData<String> provision()
{
log.info("If I were a real strategy I'd create something now");
return null;

View File

@ -25,7 +25,7 @@ import java.util.List;
*/
public interface ScalingStrategy<T>
{
public AutoScalingData<T> provision(long numUnassignedTasks);
public AutoScalingData<T> provision();
public AutoScalingData<T> terminate(List<String> nodeIds);
}