From 7083821fbff153c61c17c999d6330023f20301d2 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Thu, 8 Nov 2012 10:08:40 -0800 Subject: [PATCH] no param for scaling provision and minor fix for initialization --- .../druid/initialization/Initialization.java | 46 +++++++++++++------ .../merger/coordinator/RemoteTaskRunner.java | 18 +------- .../merger/coordinator/WorkerWrapper.java | 20 +++++++- .../scaling/EC2AutoScalingStrategy.java | 2 +- .../scaling/NoopScalingStrategy.java | 2 +- .../coordinator/scaling/ScalingStrategy.java | 2 +- 6 files changed, 54 insertions(+), 36 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index addf3ac6ebb..1d2c5c12a11 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -69,7 +69,8 @@ public class Initialization "druid.zk.paths.announcementsPath", "druid.zk.paths.servedSegmentsPath", "druid.zk.paths.loadQueuePath", - "druid.zk.paths.masterPath"}; + "druid.zk.paths.masterPath" + }; public static final String DEFAULT_ZPATH = "/druid"; public static ZkClient makeZkClient(ZkClientConfig config, Lifecycle lifecycle) @@ -119,10 +120,12 @@ public class Initialization } - /** Load properties. + /** + * Load properties. * Properties are layered, high to low precedence: cmdLine -D, runtime.properties file, stored in zookeeper. * Idempotent. Thread-safe. Properties are only loaded once. * If property druid.zk.service.host=none then do not load properties from zookeeper. + * * @return Properties ready to use. */ public synchronized static Properties loadProperties() @@ -139,7 +142,9 @@ public class Initialization final InputStream stream = ClassLoader.getSystemResourceAsStream("runtime.properties"); if (stream == null) { - log.info("runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now."); + log.info( + "runtime.properties not found as a resource in classpath, relying only on system properties, and zookeeper now." + ); } else { log.info("Loading properties from runtime.properties"); try { @@ -202,7 +207,7 @@ public class Initialization log.warn("property druid.zk.service.host is not set, so no way to contact zookeeper for coordination."); } // validate properties now that all levels of precedence are loaded - if (! validateResolveProps(tmp_props)) { + if (!validateResolveProps(tmp_props)) { log.error("Properties failed to validate, cannot continue"); throw new RuntimeException("Properties failed to validate"); } @@ -358,12 +363,15 @@ public class Initialization return String.format("%s/%s", basePath, PROP_SUBPATH); } - /** Validate and Resolve Properties. + /** + * Validate and Resolve Properties. * Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value. * Check validity so that if druid.zk.paths.*Path props are set, all are set, * if none set, then construct defaults relative to druid.zk.paths.base and add these * to the properties chain. + * * @param props + * * @return true if valid zpath properties. */ public static boolean validateResolveProps(Properties props) @@ -379,7 +387,9 @@ public class Initialization final String propertiesZpathOverride = props.getProperty("druid.zk.paths.propertiesPath"); - if (!zpathValidateFailed) System.out.println("Effective zpath prefix=" + zpathEffective); + if (!zpathValidateFailed) { + System.out.println("Effective zpath prefix=" + zpathEffective); + } // validate druid.zk.paths.*Path properties // @@ -408,22 +418,25 @@ public class Initialization } } if (zpathOverridesNotAbs) { - System.err.println("When overriding zk zpaths, with properties like druid.zk.paths.*Path " + - "the znode path must start with '/' (slash) ; problem overrides:"); + System.err.println( + "When overriding zk zpaths, with properties like druid.zk.paths.*Path " + + "the znode path must start with '/' (slash) ; problem overrides:" + ); System.err.print(sbErrors.toString()); } if (zpathOverrideCount > 0) { - if (zpathOverrideCount < SUB_PATH_PROPS.length + 1) { + if (zpathOverrideCount < SUB_PATH_PROPS.length) { zpathValidateFailed = true; - System.err.println("When overriding zk zpaths, with properties of form druid.zk.paths.*Path " + - "all must be overridden together; missing overrides:"); + System.err.println( + "When overriding zk zpaths, with properties of form druid.zk.paths.*Path " + + "all must be overridden together; missing overrides:" + ); for (int i = 0; i < SUB_PATH_PROPS.length; i++) { String val = props.getProperty(SUB_PATH_PROPS[i]); if (val == null) { System.err.println(" " + SUB_PATH_PROPS[i]); } } - if (propertiesZpathOverride == null) System.err.println(" " + "druid.zk.paths.propertiesPath"); } else { // proper overrides // do not prefix with property druid.zk.paths.base ; // fallthru @@ -440,13 +453,16 @@ public class Initialization } props.setProperty("druid.zk.paths.propertiesPath", zpathEffective + "/properties"); } - return ! zpathValidateFailed; + return !zpathValidateFailed; } - /** Check znode zpath base for proper slash, no trailing slash. - * @param zpathBase znode base path, if null then this method does nothing. + /** + * Check znode zpath base for proper slash, no trailing slash. + * + * @param zpathBase znode base path, if null then this method does nothing. * @param errorMsgPrefix error context to use if errors are emitted, should indicate * where the zpathBase value came from. + * * @return true if validate failed. */ public static boolean zpathBaseCheck(String zpathBase, String errorMsgPrefix) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index a2bc0c3688e..fce83b8618a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -42,7 +42,6 @@ import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; import com.metamx.druid.merger.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.netflix.curator.framework.CuratorFramework; -import com.netflix.curator.framework.recipes.cache.ChildData; import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener; @@ -53,7 +52,6 @@ import org.joda.time.Duration; import org.joda.time.Period; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -373,19 +371,7 @@ public class RemoteTaskRunner implements TaskRunner final WorkerWrapper workerWrapper = new WorkerWrapper( worker, statusCache, - new Function() - { - @Override - public String apply(@Nullable ChildData input) - { - try { - return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } + jsonMapper ); // Add status listener to the watcher for status changes @@ -520,7 +506,7 @@ public class RemoteTaskRunner implements TaskRunner log.info("Worker nodes do not have capacity to run any more tasks!"); if (currentlyProvisioning.isEmpty()) { - AutoScalingData provisioned = strategy.provision(currentlyProvisioning.size()); + AutoScalingData provisioned = strategy.provision(); if (provisioned != null) { currentlyProvisioning.addAll(provisioned.getNodeIds()); lastProvisionTime = new DateTime(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java index 68d4f0a128c..c6353698cbd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java @@ -20,13 +20,17 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.worker.Worker; import com.netflix.curator.framework.recipes.cache.ChildData; import com.netflix.curator.framework.recipes.cache.PathChildrenCache; +import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Set; @@ -41,11 +45,23 @@ public class WorkerWrapper implements Closeable private volatile DateTime lastCompletedTaskTime = new DateTime(); - public WorkerWrapper(Worker worker, PathChildrenCache statusCache, Function cacheConverter) + public WorkerWrapper(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) { this.worker = worker; this.statusCache = statusCache; - this.cacheConverter = cacheConverter; + this.cacheConverter = new Function() + { + @Override + public String apply(@Nullable ChildData input) + { + try { + return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; } public Worker getWorker() diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index a85c3ade8fd..cd94b70d3ce 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -56,7 +56,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy } @Override - public AutoScalingData provision(long numUnassignedTasks) + public AutoScalingData provision() { try { log.info("Creating new instance(s)..."); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java index 923de463870..67eb99293e4 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java @@ -12,7 +12,7 @@ public class NoopScalingStrategy implements ScalingStrategy private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class); @Override - public AutoScalingData provision(long numUnassignedTasks) + public AutoScalingData provision() { log.info("If I were a real strategy I'd create something now"); return null; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java index ec71d856301..9b7da8fb3a4 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java @@ -25,7 +25,7 @@ import java.util.List; */ public interface ScalingStrategy { - public AutoScalingData provision(long numUnassignedTasks); + public AutoScalingData provision(); public AutoScalingData terminate(List nodeIds); }