From efd0dc0062e056f459e9c25dcc166d05868a01e0 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 6 Feb 2013 11:09:49 -0800 Subject: [PATCH] refactor scaling stuff --- .../query/search/FragmentSearchQuerySpec.java | 2 +- .../InsensitiveContainsSearchQuerySpec.java | 7 +- .../merger/coordinator/RemoteTaskRunner.java | 181 ++++---------- .../druid/merger/coordinator/TaskQueue.java | 7 + .../http/IndexerCoordinatorNode.java | 77 ++++-- .../coordinator/scaling/AutoScalingData.java | 9 + ...Strategy.java => AutoScalingStrategy.java} | 9 +- .../scaling/EC2AutoScalingStrategy.java | 4 +- ...tegy.java => NoopAutoScalingStrategy.java} | 6 +- .../ResourceManagementSchedulerConfig.java | 42 ++++ .../scaling/ResourceManagementStrategy.java | 38 +++ .../scaling/ResourceManagmentScheduler.java | 138 +++++++++++ .../coordinator/scaling/ScalingStats.java | 88 +++++++ .../SimpleResourceManagementStrategy.java | 222 ++++++++++++++++++ .../SimpleResourceManagmentConfig.java | 41 ++++ .../com/metamx/druid/merger/TestTask.java | 71 ++++++ .../coordinator/RemoteTaskRunnerTest.java | 65 +---- .../SimpleResourceManagementStrategyTest.java | 142 +++++++++++ .../scaling/TestAutoScalingStrategy.java | 45 ++++ 19 files changed, 959 insertions(+), 235 deletions(-) rename merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/{ScalingStrategy.java => AutoScalingStrategy.java} (81%) rename merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/{NoopScalingStrategy.java => NoopAutoScalingStrategy.java} (90%) create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java create mode 100644 merger/src/test/java/com/metamx/druid/merger/TestTask.java create mode 100644 merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java create mode 100644 merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java diff --git a/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java b/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java index e0a73b4bc3c..7786d2c01bd 100644 --- a/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java +++ b/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java @@ -73,7 +73,7 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec public boolean accept(String dimVal) { for (String value : values) { - if (!dimVal.toLowerCase().contains(value)) { + if (dimVal == null || !dimVal.toLowerCase().contains(value)) { return false; } } diff --git a/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java b/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java index 87a6246a5b2..1de1c7360fa 100644 --- a/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java +++ b/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java @@ -59,6 +59,9 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec @Override public boolean accept(String dimVal) { + if (dimVal == null) { + return false; + } return dimVal.toLowerCase().contains(value); } @@ -77,8 +80,8 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec public String toString() { return "InsensitiveContainsSearchQuerySpec{" + - "value=" + value + - ", sortSpec=" + sortSpec + + "value=" + value + + ", sortSpec=" + sortSpec + "}"; } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 32acc66ae43..c2a201c9632 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -19,27 +19,21 @@ package com.metamx.druid.merger.coordinator; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import com.metamx.common.ISE; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.druid.PeriodGranularity; import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskHolder; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; -import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; -import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.Worker; import com.metamx.emitter.EmittingLogger; @@ -52,29 +46,25 @@ import com.netflix.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; -import org.joda.time.Duration; -import org.joda.time.Period; -import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** - * The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers - * are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node - * changes in ZK. + * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure + * scenarios. The RemoteTaskRunner keeps track of which workers are running which tasks and manages coordinator and + * worker interactions over Zookeeper. The RemoteTaskRunner is event driven and updates state according to ephemeral + * node changes in ZK. *

- * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling - * strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote - * task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy. - * The remote task runner periodically runs a check to see if any worker nodes have not had any work for a - * specified period of time. If so, the worker node will be terminated. + * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will + * fail. The RemoteTaskRunner depends on another manager to create additional worker resources. + * For example, {@link com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler} can take care of these duties. + * *

* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks * that were associated with the node. @@ -90,7 +80,6 @@ public class RemoteTaskRunner implements TaskRunner private final PathChildrenCache workerPathCache; private final ScheduledExecutorService scheduledExec; private final RetryPolicyFactory retryPolicyFactory; - private final ScalingStrategy strategy; private final WorkerSetupManager workerSetupManager; // all workers that exist in ZK @@ -98,12 +87,8 @@ public class RemoteTaskRunner implements TaskRunner // all tasks that are assigned or need to be assigned private final Map tasks = new ConcurrentHashMap(); - private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet(); - private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet(); private final Object statusLock = new Object(); - private volatile DateTime lastProvisionTime = new DateTime(); - private volatile DateTime lastTerminateTime = new DateTime(); private volatile boolean started = false; public RemoteTaskRunner( @@ -113,7 +98,6 @@ public class RemoteTaskRunner implements TaskRunner PathChildrenCache workerPathCache, ScheduledExecutorService scheduledExec, RetryPolicyFactory retryPolicyFactory, - ScalingStrategy strategy, WorkerSetupManager workerSetupManager ) { @@ -123,7 +107,6 @@ public class RemoteTaskRunner implements TaskRunner this.workerPathCache = workerPathCache; this.scheduledExec = scheduledExec; this.retryPolicyFactory = retryPolicyFactory; - this.strategy = strategy; this.workerSetupManager = workerSetupManager; } @@ -131,6 +114,10 @@ public class RemoteTaskRunner implements TaskRunner public void start() { try { + if (started) { + return; + } + workerPathCache.getListenable().addListener( new PathChildrenCacheListener() { @@ -157,81 +144,6 @@ public class RemoteTaskRunner implements TaskRunner ); workerPathCache.start(); - // Schedule termination of worker nodes periodically - Period period = new Period(config.getTerminateResourcesDuration()); - PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null); - final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); - - ScheduledExecutors.scheduleAtFixedRate( - scheduledExec, - new Duration( - System.currentTimeMillis(), - startTime - ), - config.getTerminateResourcesDuration(), - new Runnable() - { - @Override - public void run() - { - if (currentlyTerminating.isEmpty()) { - final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers(); - if (zkWorkers.size() <= minNumWorkers) { - return; - } - - List thoseLazyWorkers = Lists.newArrayList( - FunctionalIterable - .create(zkWorkers.values()) - .filter( - new Predicate() - { - @Override - public boolean apply(WorkerWrapper input) - { - return input.getRunningTasks().isEmpty() - && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() - > config.getMaxWorkerIdleTimeMillisBeforeDeletion(); - } - } - ) - ); - - AutoScalingData terminated = strategy.terminate( - Lists.transform( - thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1), - new Function() - { - @Override - public String apply(WorkerWrapper input) - { - return input.getWorker().getIp(); - } - } - ) - ); - - if (terminated != null) { - currentlyTerminating.addAll(terminated.getNodeIds()); - lastTerminateTime = new DateTime(); - } - } else { - Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); - if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { - log.makeAlert("Worker node termination taking too long") - .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) - .addData("terminatingCount", currentlyTerminating.size()) - .emit(); - } - - log.info( - "%s still terminating. Wait for all nodes to terminate before trying again.", - currentlyTerminating - ); - } - } - } - ); started = true; } catch (Exception e) { @@ -243,6 +155,10 @@ public class RemoteTaskRunner implements TaskRunner public void stop() { try { + if (!started) { + return; + } + for (WorkerWrapper workerWrapper : zkWorkers.values()) { workerWrapper.close(); } @@ -255,16 +171,16 @@ public class RemoteTaskRunner implements TaskRunner } } - public boolean hasStarted() - { - return started; - } - - public int getNumWorkers() + public int getNumAvailableWorkers() { return zkWorkers.size(); } + public Collection getAvailableWorkers() + { + return zkWorkers.values(); + } + public boolean isTaskRunning(String taskId) { for (WorkerWrapper workerWrapper : zkWorkers.values()) { @@ -275,6 +191,13 @@ public class RemoteTaskRunner implements TaskRunner return false; } + /** + * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task. + * + * @param task task to run + * @param context task context to run under + * @param callback callback to be called exactly once + */ @Override public void run(Task task, TaskContext context, TaskCallback callback) { @@ -288,11 +211,18 @@ public class RemoteTaskRunner implements TaskRunner assignTask(taskWrapper); } + /** + * Ensures no workers are already running a task before assigning the task to a worker. + * It is possible that a worker is running a task the RTR has no knowledge of. This is common when the RTR + * needs to bootstrap after a restart. + * + * @param taskWrapper - a wrapper containing task metadata + */ private void assignTask(TaskWrapper taskWrapper) { WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper); - // If the task already exists, we don't need to announce it + // If a worker is already running this task, we don't need to announce it if (workerWrapper != null) { final Worker worker = workerWrapper.getWorker(); try { @@ -395,8 +325,6 @@ public class RemoteTaskRunner implements TaskRunner private void addWorker(final Worker worker) { try { - currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.asList(worker.getIp()))); - final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final WorkerWrapper workerWrapper = new WorkerWrapper( @@ -460,12 +388,12 @@ public class RemoteTaskRunner implements TaskRunner } else { final TaskCallback callback = taskWrapper.getCallback(); - // Cleanup - if (callback != null) { - callback.notify(taskStatus); - } - if (taskStatus.isComplete()) { + // Cleanup + if (callback != null) { + callback.notify(taskStatus); + } + // Worker is done with this task workerWrapper.setLastCompletedTaskTime(new DateTime()); tasks.remove(taskId); @@ -510,8 +438,6 @@ public class RemoteTaskRunner implements TaskRunner */ private void removeWorker(final Worker worker) { - currentlyTerminating.remove(worker.getHost()); - WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost()); if (workerWrapper != null) { try { @@ -564,27 +490,6 @@ public class RemoteTaskRunner implements TaskRunner if (workerQueue.isEmpty()) { log.info("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); - - if (currentlyProvisioning.isEmpty()) { - AutoScalingData provisioned = strategy.provision(); - if (provisioned != null) { - currentlyProvisioning.addAll(provisioned.getNodeIds()); - lastProvisionTime = new DateTime(); - } - } else { - Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); - if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { - log.makeAlert("Worker node provisioning taking too long") - .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) - .addData("provisioningCount", currentlyProvisioning.size()) - .emit(); - } - - log.info( - "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", - currentlyProvisioning - ); - } return null; } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java index e228b401025..90f19000e60 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java @@ -42,6 +42,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -376,6 +377,12 @@ public class TaskQueue } } + public Collection getAvailableTasks() + { + // TODO: actually implement this + return Lists.newArrayList(); + } + /** * Unlock some work. Does not update the task storage facility. Throws an exception if this work is not currently * running. diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 3d084d60712..242c7873b6d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -75,8 +75,12 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; -import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy; -import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy; +import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerConfig; +import com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler; +import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy; +import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy; +import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; @@ -529,35 +533,19 @@ public class IndexerCoordinatorNode extends RegisteringNode .build() ); - ScalingStrategy strategy; - if (config.getStrategyImpl().equalsIgnoreCase("ec2")) { - strategy = new EC2AutoScalingStrategy( - jsonMapper, - new AmazonEC2Client( - new BasicAWSCredentials( - PropUtils.getProperty(props, "com.metamx.aws.accessKey"), - PropUtils.getProperty(props, "com.metamx.aws.secretKey") - ) - ), - configFactory.build(EC2AutoScalingStrategyConfig.class), - workerSetupManager - ); - } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { - strategy = new NoopScalingStrategy(); - } else { - throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl()); - } - - return new RemoteTaskRunner( + RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner( jsonMapper, configFactory.build(RemoteTaskRunnerConfig.class), curatorFramework, new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true), retryScheduledExec, new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)), - strategy, workerSetupManager ); + + initializeWorkerScaling(remoteTaskRunner); + + return remoteTaskRunner; } }; @@ -577,6 +565,49 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + private void initializeWorkerScaling(RemoteTaskRunner taskRunner) + { + final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ScalingExec--%d") + .build() + ); + + AutoScalingStrategy strategy; + if (config.getStrategyImpl().equalsIgnoreCase("ec2")) { + strategy = new EC2AutoScalingStrategy( + jsonMapper, + new AmazonEC2Client( + new BasicAWSCredentials( + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") + ) + ), + configFactory.build(EC2AutoScalingStrategyConfig.class), + workerSetupManager + ); + } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { + strategy = new NoopAutoScalingStrategy(); + } else { + throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl()); + } + + ResourceManagmentScheduler resourceManagmentScheduler = new ResourceManagmentScheduler( + taskQueue, + taskRunner, + new SimpleResourceManagementStrategy( + strategy, + configFactory.build(SimpleResourceManagmentConfig.class), + workerSetupManager + ), + configFactory.build(ResourceManagementSchedulerConfig.class), + scalingScheduledExec + ); + lifecycle.addManagedInstance(resourceManagmentScheduler); + } + public static class Builder { private ObjectMapper jsonMapper = null; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java index 5a1bb4980e5..0ca74a9b38c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java @@ -43,4 +43,13 @@ public class AutoScalingData { return nodes; } + + @Override + public String toString() + { + return "AutoScalingData{" + + "nodeIds=" + nodeIds + + ", nodes=" + nodes + + '}'; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingStrategy.java similarity index 81% rename from merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java rename to merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingStrategy.java index 150de1357e0..7ab92a0b985 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingStrategy.java @@ -22,8 +22,9 @@ package com.metamx.druid.merger.coordinator.scaling; import java.util.List; /** + * The AutoScalingStrategy has the actual methods to provision and terminate worker nodes. */ -public interface ScalingStrategy +public interface AutoScalingStrategy { public AutoScalingData provision(); @@ -31,8 +32,8 @@ public interface ScalingStrategy /** * Provides a lookup of ip addresses to node ids - * @param ips - * @return + * @param ips - nodes ips + * @return node ids */ - public List ipLookup(List ips); + public List ipToIdLookup(List ips); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index 2a50a8b55fd..d64899a2739 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -43,7 +43,7 @@ import java.util.List; /** */ -public class EC2AutoScalingStrategy implements ScalingStrategy +public class EC2AutoScalingStrategy implements AutoScalingStrategy { private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class); @@ -187,7 +187,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy } @Override - public List ipLookup(List ips) + public List ipToIdLookup(List ips) { DescribeInstancesResult result = amazonEC2Client.describeInstances( new DescribeInstancesRequest() diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopAutoScalingStrategy.java similarity index 90% rename from merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java rename to merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopAutoScalingStrategy.java index 2b412ca6202..d4a5f355c6f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopAutoScalingStrategy.java @@ -26,9 +26,9 @@ import java.util.List; /** * This class just logs when scaling should occur. */ -public class NoopScalingStrategy implements ScalingStrategy +public class NoopAutoScalingStrategy implements AutoScalingStrategy { - private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class); + private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class); @Override public AutoScalingData provision() @@ -45,7 +45,7 @@ public class NoopScalingStrategy implements ScalingStrategy } @Override - public List ipLookup(List ips) + public List ipToIdLookup(List ips) { log.info("I'm not a real strategy so I'm returning what I got %s", ips); return ips; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java new file mode 100644 index 00000000000..c732b5c6c1a --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java @@ -0,0 +1,42 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class ResourceManagementSchedulerConfig +{ + @Config("druid.indexer.provisionResources.duration") + @Default("PT1H") + public abstract Duration getProvisionResourcesDuration(); + + @Config("druid.indexer.terminateResources.duration") + @Default("PT1H") + public abstract Duration getTerminateResourcesDuration(); + + @Config("druid.indexer.terminateResources.originDateTime") + @Default("2012-01-01T00:55:00.000Z") + public abstract DateTime getTerminateResourcesOriginDateTime(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java new file mode 100644 index 00000000000..37483082e98 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java @@ -0,0 +1,38 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import com.metamx.druid.merger.common.task.Task; +import com.metamx.druid.merger.coordinator.WorkerWrapper; + +import java.util.Collection; + +/** + * The ResourceManagementStrategy decides if worker nodes should be provisioned or determined + * based on the available tasks in the system and the state of the workers in the system. + */ +public interface ResourceManagementStrategy +{ + public void doProvision(Collection availableTasks, Collection workerWrappers); + + public void doTerminate(Collection availableTasks, Collection workerWrappers); + + public ScalingStats getStats(); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java new file mode 100644 index 00000000000..b2ae623b978 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java @@ -0,0 +1,138 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import com.metamx.druid.PeriodGranularity; +import com.metamx.druid.merger.coordinator.RemoteTaskRunner; +import com.metamx.druid.merger.coordinator.TaskQueue; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * The ResourceManagmentScheduler manages when worker nodes should potentially be created or destroyed. + * It uses a {@link TaskQueue} to return the available tasks in the system and a {@link RemoteTaskRunner} to return + * the status of the worker nodes in the system. + * The ResourceManagmentScheduler does not contain the logic to decide whether provision or termination should actually occur. + * That decision is made in the {@link ResourceManagementStrategy}. + */ +public class ResourceManagmentScheduler +{ + private static final Logger log = new Logger(ResourceManagmentScheduler.class); + + private final TaskQueue taskQueue; + private final RemoteTaskRunner remoteTaskRunner; + private final ResourceManagementStrategy resourceManagementStrategy; + private final ResourceManagementSchedulerConfig config; + private final ScheduledExecutorService exec; + + private final Object lock = new Object(); + private volatile boolean started = false; + + public ResourceManagmentScheduler( + TaskQueue taskQueue, + RemoteTaskRunner remoteTaskRunner, + ResourceManagementStrategy resourceManagementStrategy, + ResourceManagementSchedulerConfig config, + ScheduledExecutorService exec + ) + { + this.taskQueue = taskQueue; + this.remoteTaskRunner = remoteTaskRunner; + this.resourceManagementStrategy = resourceManagementStrategy; + this.config = config; + this.exec = exec; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + ScheduledExecutors.scheduleAtFixedRate( + exec, + config.getProvisionResourcesDuration(), + new Runnable() + { + @Override + public void run() + { + resourceManagementStrategy.doProvision( + taskQueue.getAvailableTasks(), + remoteTaskRunner.getAvailableWorkers() + ); + } + } + ); + + // Schedule termination of worker nodes periodically + Period period = new Period(config.getTerminateResourcesDuration()); + PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null); + final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); + + ScheduledExecutors.scheduleAtFixedRate( + exec, + new Duration( + System.currentTimeMillis(), + startTime + ), + config.getTerminateResourcesDuration(), + new Runnable() + { + @Override + public void run() + { + resourceManagementStrategy.doTerminate( + taskQueue.getAvailableTasks(), + remoteTaskRunner.getAvailableWorkers() + ); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + exec.shutdown(); + } + } + + public ScalingStats getStats() + { + return resourceManagementStrategy.getStats(); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java new file mode 100644 index 00000000000..d632a61baae --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java @@ -0,0 +1,88 @@ +package com.metamx.druid.merger.coordinator.scaling; + +import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; +import org.joda.time.DateTime; +import org.joda.time.DateTimeComparator; + +import java.util.List; + +/** + */ +public class ScalingStats +{ + private static enum EVENT + { + PROVISION, + TERMINATE + } + + private final MinMaxPriorityQueue recentNodes; + + public ScalingStats(int capacity) + { + this.recentNodes = MinMaxPriorityQueue + .orderedBy(DateTimeComparator.getInstance()) + .maximumSize(capacity) + .create(); + } + + public void addProvisionEvent(AutoScalingData data) + { + recentNodes.add( + new ScalingEvent( + data, + new DateTime(), + EVENT.PROVISION + ) + ); + } + + public void addTerminateEvent(AutoScalingData data) + { + recentNodes.add( + new ScalingEvent( + data, + new DateTime(), + EVENT.TERMINATE + ) + ); + } + + public List toList() + { + List retVal = Lists.newArrayList(); + while (!recentNodes.isEmpty()) { + retVal.add(recentNodes.poll()); + } + return retVal; + } + + public static class ScalingEvent + { + private final AutoScalingData data; + private final DateTime timestamp; + private final EVENT event; + + private ScalingEvent( + AutoScalingData data, + DateTime timestamp, + EVENT event + ) + { + this.data = data; + this.timestamp = timestamp; + this.event = event; + } + + @Override + public String toString() + { + return "ScalingEvent{" + + "data=" + data + + ", timestamp=" + timestamp + + ", event=" + event + + '}'; + } + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java new file mode 100644 index 00000000000..99c581e5cce --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -0,0 +1,222 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.druid.merger.common.task.Task; +import com.metamx.druid.merger.coordinator.WorkerWrapper; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import com.metamx.emitter.EmittingLogger; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + */ +public class SimpleResourceManagementStrategy implements ResourceManagementStrategy +{ + private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); + + private final AutoScalingStrategy autoScalingStrategy; + private final SimpleResourceManagmentConfig config; + private final WorkerSetupManager workerSetupManager; + private final ScalingStats scalingStats; + + private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet(); + private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet(); + + private volatile DateTime lastProvisionTime = new DateTime(); + private volatile DateTime lastTerminateTime = new DateTime(); + + public SimpleResourceManagementStrategy( + AutoScalingStrategy autoScalingStrategy, + SimpleResourceManagmentConfig config, + WorkerSetupManager workerSetupManager + ) + { + this.autoScalingStrategy = autoScalingStrategy; + this.config = config; + this.workerSetupManager = workerSetupManager; + this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); + } + + @Override + public void doProvision(Collection availableTasks, Collection workerWrappers) + { + boolean nothingProvisioning = Sets.difference( + currentlyProvisioning, + Sets.newHashSet( + autoScalingStrategy.ipToIdLookup( + Lists.newArrayList( + Iterables.transform( + workerWrappers, new Function() + { + @Override + public String apply(WorkerWrapper input) + { + return input.getWorker().getIp(); + } + } + ) + ) + ) + ) + ).isEmpty(); + + boolean moreTasksThanWorkerCapacity = !Sets.difference( + Sets.newHashSet(availableTasks), + Sets.newHashSet( + Iterables.concat( + Iterables.transform( + workerWrappers, + new Function>() + { + @Override + public Set apply(WorkerWrapper input) + { + return input.getRunningTasks(); + } + } + ) + ) + ) + ).isEmpty(); + + if (nothingProvisioning && moreTasksThanWorkerCapacity) { + AutoScalingData provisioned = autoScalingStrategy.provision(); + + if (provisioned != null) { + currentlyProvisioning.addAll(provisioned.getNodeIds()); + lastProvisionTime = new DateTime(); + scalingStats.addProvisionEvent(provisioned); + } + } else { + Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); + if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { + log.makeAlert("Worker node provisioning taking too long") + .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) + .addData("provisioningCount", currentlyProvisioning.size()) + .emit(); + } + + log.info( + "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", + currentlyProvisioning + ); + } + } + + @Override + public void doTerminate(Collection availableTasks, Collection workerWrappers) + { + boolean nothingTerminating = Sets.difference( + currentlyTerminating, + Sets.newHashSet( + autoScalingStrategy.ipToIdLookup( + Lists.newArrayList( + Iterables.transform( + workerWrappers, new Function() + { + @Override + public String apply(WorkerWrapper input) + { + return input.getWorker().getIp(); + } + } + ) + ) + ) + ) + ).isEmpty(); + + if (nothingTerminating) { + final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers(); + if (workerWrappers.size() <= minNumWorkers) { + return; + } + + List thoseLazyWorkers = Lists.newArrayList( + FunctionalIterable + .create(workerWrappers) + .filter( + new Predicate() + { + @Override + public boolean apply(WorkerWrapper input) + { + return input.getRunningTasks().isEmpty() + && System.currentTimeMillis() - input.getLastCompletedTaskTime() + .getMillis() + > config.getMaxWorkerIdleTimeMillisBeforeDeletion(); + } + } + ) + ); + + AutoScalingData terminated = autoScalingStrategy.terminate( + Lists.transform( + thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1), + new Function() + { + @Override + public String apply(WorkerWrapper input) + { + return input.getWorker().getIp(); + } + } + ) + ); + + if (terminated != null) { + currentlyTerminating.addAll(terminated.getNodeIds()); + lastTerminateTime = new DateTime(); + scalingStats.addProvisionEvent(terminated); + } + } else { + Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); + if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { + log.makeAlert("Worker node termination taking too long") + .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) + .addData("terminatingCount", currentlyTerminating.size()) + .emit(); + } + + log.info( + "%s still terminating. Wait for all nodes to terminate before trying again.", + currentlyTerminating + ); + } + } + + @Override + public ScalingStats getStats() + { + return scalingStats; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java new file mode 100644 index 00000000000..a8a5b52ca89 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java @@ -0,0 +1,41 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import org.joda.time.Duration; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class SimpleResourceManagmentConfig +{ + @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") + @Default("600000") + public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); + + @Config("druid.indexer.maxScalingDuration") + @Default("PT1H") + public abstract Duration getMaxScalingDuration(); + + @Config("druid.indexer.numEventsToTrack") + @Default("20") + public abstract int getNumEventsToTrack(); +} diff --git a/merger/src/test/java/com/metamx/druid/merger/TestTask.java b/merger/src/test/java/com/metamx/druid/merger/TestTask.java new file mode 100644 index 00000000000..142d2c88254 --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/TestTask.java @@ -0,0 +1,71 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger; + +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskCallback; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.task.DefaultMergeTask; +import com.metamx.druid.merger.coordinator.TaskContext; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonTypeName; + +import java.util.List; + +/** + */ +@JsonTypeName("test") +public class TestTask extends DefaultMergeTask +{ + private final String id; + + public TestTask( + @JsonProperty("id") String id, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segments") List segments, + @JsonProperty("aggregations") List aggregators + ) + { + super(dataSource, segments, aggregators); + + this.id = id; + } + + @Override + @JsonProperty + public String getId() + { + return id; + } + + @Override + public Type getType() + { + return Type.TEST; + } + + @Override + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception + { + return TaskStatus.success("task1"); + } +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index ec53b0257b6..c1f264b87ef 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -7,6 +7,7 @@ import com.metamx.common.ISE; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.merger.TestTask; import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; @@ -14,11 +15,10 @@ import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.task.DefaultMergeTask; import com.metamx.druid.merger.common.task.Task; -import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingData; -import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy; +import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; import com.metamx.druid.merger.worker.TaskMonitor; @@ -326,7 +326,6 @@ public class RemoteTaskRunnerTest pathChildrenCache, scheduledExec, new RetryPolicyFactory(new TestRetryPolicyConfig()), - new TestScalingStrategy(), workerSetupManager ); @@ -337,7 +336,7 @@ public class RemoteTaskRunnerTest jsonMapper.writeValueAsBytes(worker1) ); int count = 0; - while (remoteTaskRunner.getNumWorkers() == 0) { + while (remoteTaskRunner.getNumAvailableWorkers() == 0) { Thread.sleep(500); if (count > 10) { throw new ISE("WTF?! Still can't find worker!"); @@ -367,27 +366,6 @@ public class RemoteTaskRunnerTest } } - private static class TestScalingStrategy implements ScalingStrategy - { - @Override - public AutoScalingData provision() - { - return null; - } - - @Override - public AutoScalingData terminate(List nodeIds) - { - return null; - } - - @Override - public List ipLookup(List ips) - { - return ips; - } - } - private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig { @Override @@ -444,41 +422,4 @@ public class RemoteTaskRunnerTest return 1000; } } - - @JsonTypeName("test") - private static class TestTask extends DefaultMergeTask - { - private final String id; - - public TestTask( - @JsonProperty("id") String id, - @JsonProperty("dataSource") String dataSource, - @JsonProperty("segments") List segments, - @JsonProperty("aggregations") List aggregators - ) - { - super(dataSource, segments, aggregators); - - this.id = id; - } - - @Override - @JsonProperty - public String getId() - { - return id; - } - - @Override - public Type getType() - { - return Type.TEST; - } - - @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception - { - return TaskStatus.success("task1"); - } - } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java new file mode 100644 index 00000000000..10398da005f --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -0,0 +1,142 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.TestTask; +import com.metamx.druid.merger.common.task.Task; +import com.metamx.druid.merger.coordinator.WorkerWrapper; +import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Set; + +/** + */ +public class SimpleResourceManagementStrategyTest +{ + private AutoScalingStrategy autoScalingStrategy; + private WorkerSetupManager workerSetupManager; + private Task testTask; + private SimpleResourceManagementStrategy simpleResourceManagementStrategy; + + @Before + public void setUp() throws Exception + { + workerSetupManager = EasyMock.createMock(WorkerSetupManager.class); + autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); + + testTask = new TestTask( + "task1", + "dummyDs", + Lists.newArrayList( + new DataSegment( + "dummyDs", + new Interval(new DateTime(), new DateTime()), + new DateTime().toString(), + null, + null, + null, + null, + 0 + ) + ), Lists.newArrayList() + ); + simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( + new TestAutoScalingStrategy(), + new SimpleResourceManagmentConfig() + { + @Override + public int getMaxWorkerIdleTimeMillisBeforeDeletion() + { + return 0; + } + + @Override + public Duration getMaxScalingDuration() + { + return null; + } + + @Override + public int getNumEventsToTrack() + { + return 1; + } + }, + workerSetupManager + ); + } + + @Test + public void testSuccessfulProvision() throws Exception + { + EasyMock.expect(autoScalingStrategy.provision()).andReturn( + new AutoScalingData(Lists.newArrayList(), Lists.newArrayList()) + ); + EasyMock.replay(autoScalingStrategy); + + simpleResourceManagementStrategy.doProvision( + Arrays.asList( + testTask + ), + Arrays.asList( + new TestWorkerWrapper(testTask) + ) + ); + + EasyMock.verify(autoScalingStrategy); + } + + @Test + public void testDoTerminate() throws Exception + { + + } + + private static class TestWorkerWrapper extends WorkerWrapper + { + private final Task testTask; + + private TestWorkerWrapper( + Task testTask + ) + { + super(null, null, null); + + this.testTask = testTask; + } + + @Override + public Set getRunningTasks() + { + return Sets.newHashSet(testTask.getId()); + } + } +} diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java new file mode 100644 index 00000000000..8213da61848 --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java @@ -0,0 +1,45 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.coordinator.scaling; + +import java.util.List; + +/** + */ +public class TestAutoScalingStrategy implements AutoScalingStrategy +{ + @Override + public AutoScalingData provision() + { + return null; + } + + @Override + public AutoScalingData terminate(List ids) + { + return null; + } + + @Override + public List ipToIdLookup(List ips) + { + return null; + } +}