refactor scaling stuff

This commit is contained in:
Fangjin Yang 2013-02-06 11:09:49 -08:00
parent ea64eaabce
commit efd0dc0062
19 changed files with 959 additions and 235 deletions

View File

@ -73,7 +73,7 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec
public boolean accept(String dimVal)
{
for (String value : values) {
if (!dimVal.toLowerCase().contains(value)) {
if (dimVal == null || !dimVal.toLowerCase().contains(value)) {
return false;
}
}

View File

@ -59,6 +59,9 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
@Override
public boolean accept(String dimVal)
{
if (dimVal == null) {
return false;
}
return dimVal.toLowerCase().contains(value);
}

View File

@ -19,27 +19,21 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
@ -52,29 +46,25 @@ import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers
* are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node
* changes in ZK.
* The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure
* scenarios. The RemoteTaskRunner keeps track of which workers are running which tasks and manages coordinator and
* worker interactions over Zookeeper. The RemoteTaskRunner is event driven and updates state according to ephemeral
* node changes in ZK.
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling
* strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote
* task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy.
* The remote task runner periodically runs a check to see if any worker nodes have not had any work for a
* specified period of time. If so, the worker node will be terminated.
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another manager to create additional worker resources.
* For example, {@link com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler} can take care of these duties.
*
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
* that were associated with the node.
@ -90,7 +80,6 @@ public class RemoteTaskRunner implements TaskRunner
private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final ScalingStrategy strategy;
private final WorkerSetupManager workerSetupManager;
// all workers that exist in ZK
@ -98,12 +87,8 @@ public class RemoteTaskRunner implements TaskRunner
// all tasks that are assigned or need to be assigned
private final Map<String, TaskWrapper> tasks = new ConcurrentHashMap<String, TaskWrapper>();
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
private final Object statusLock = new Object();
private volatile DateTime lastProvisionTime = new DateTime();
private volatile DateTime lastTerminateTime = new DateTime();
private volatile boolean started = false;
public RemoteTaskRunner(
@ -113,7 +98,6 @@ public class RemoteTaskRunner implements TaskRunner
PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
ScalingStrategy strategy,
WorkerSetupManager workerSetupManager
)
{
@ -123,7 +107,6 @@ public class RemoteTaskRunner implements TaskRunner
this.workerPathCache = workerPathCache;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
this.strategy = strategy;
this.workerSetupManager = workerSetupManager;
}
@ -131,6 +114,10 @@ public class RemoteTaskRunner implements TaskRunner
public void start()
{
try {
if (started) {
return;
}
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@ -157,81 +144,6 @@ public class RemoteTaskRunner implements TaskRunner
);
workerPathCache.start();
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesDuration());
PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
scheduledExec,
new Duration(
System.currentTimeMillis(),
startTime
),
config.getTerminateResourcesDuration(),
new Runnable()
{
@Override
public void run()
{
if (currentlyTerminating.isEmpty()) {
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) {
return;
}
List<WorkerWrapper> thoseLazyWorkers = Lists.newArrayList(
FunctionalIterable
.create(zkWorkers.values())
.filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
> config.getMaxWorkerIdleTimeMillisBeforeDeletion();
}
}
)
);
AutoScalingData terminated = strategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
new Function<WorkerWrapper, String>()
{
@Override
public String apply(WorkerWrapper input)
{
return input.getWorker().getIp();
}
}
)
);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
}
} else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node termination taking too long")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
}
log.info(
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
}
}
}
);
started = true;
}
catch (Exception e) {
@ -243,6 +155,10 @@ public class RemoteTaskRunner implements TaskRunner
public void stop()
{
try {
if (!started) {
return;
}
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
workerWrapper.close();
}
@ -255,16 +171,16 @@ public class RemoteTaskRunner implements TaskRunner
}
}
public boolean hasStarted()
{
return started;
}
public int getNumWorkers()
public int getNumAvailableWorkers()
{
return zkWorkers.size();
}
public Collection<WorkerWrapper> getAvailableWorkers()
{
return zkWorkers.values();
}
public boolean isTaskRunning(String taskId)
{
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
@ -275,6 +191,13 @@ public class RemoteTaskRunner implements TaskRunner
return false;
}
/**
* A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
*
* @param task task to run
* @param context task context to run under
* @param callback callback to be called exactly once
*/
@Override
public void run(Task task, TaskContext context, TaskCallback callback)
{
@ -288,11 +211,18 @@ public class RemoteTaskRunner implements TaskRunner
assignTask(taskWrapper);
}
/**
* Ensures no workers are already running a task before assigning the task to a worker.
* It is possible that a worker is running a task the RTR has no knowledge of. This is common when the RTR
* needs to bootstrap after a restart.
*
* @param taskWrapper - a wrapper containing task metadata
*/
private void assignTask(TaskWrapper taskWrapper)
{
WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
// If the task already exists, we don't need to announce it
// If a worker is already running this task, we don't need to announce it
if (workerWrapper != null) {
final Worker worker = workerWrapper.getWorker();
try {
@ -395,8 +325,6 @@ public class RemoteTaskRunner implements TaskRunner
private void addWorker(final Worker worker)
{
try {
currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.<String>asList(worker.getIp())));
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final WorkerWrapper workerWrapper = new WorkerWrapper(
@ -460,12 +388,12 @@ public class RemoteTaskRunner implements TaskRunner
} else {
final TaskCallback callback = taskWrapper.getCallback();
if (taskStatus.isComplete()) {
// Cleanup
if (callback != null) {
callback.notify(taskStatus);
}
if (taskStatus.isComplete()) {
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
tasks.remove(taskId);
@ -510,8 +438,6 @@ public class RemoteTaskRunner implements TaskRunner
*/
private void removeWorker(final Worker worker)
{
currentlyTerminating.remove(worker.getHost());
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
try {
@ -564,27 +490,6 @@ public class RemoteTaskRunner implements TaskRunner
if (workerQueue.isEmpty()) {
log.info("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
if (currentlyProvisioning.isEmpty()) {
AutoScalingData provisioned = strategy.provision();
if (provisioned != null) {
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();
}
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node provisioning taking too long")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
}
log.info(
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning
);
}
return null;
}

View File

@ -42,6 +42,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -376,6 +377,12 @@ public class TaskQueue
}
}
public Collection<Task> getAvailableTasks()
{
// TODO: actually implement this
return Lists.newArrayList();
}
/**
* Unlock some work. Does not update the task storage facility. Throws an exception if this work is not currently
* running.

View File

@ -75,8 +75,12 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@ -529,35 +533,19 @@ public class IndexerCoordinatorNode extends RegisteringNode
.build()
);
ScalingStrategy strategy;
if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
jsonMapper,
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
),
configFactory.build(EC2AutoScalingStrategyConfig.class),
workerSetupManager
);
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
}
return new RemoteTaskRunner(
RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
configFactory.build(RemoteTaskRunnerConfig.class),
curatorFramework,
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
strategy,
workerSetupManager
);
initializeWorkerScaling(remoteTaskRunner);
return remoteTaskRunner;
}
};
@ -577,6 +565,49 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
private void initializeWorkerScaling(RemoteTaskRunner taskRunner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ScalingExec--%d")
.build()
);
AutoScalingStrategy strategy;
if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
jsonMapper,
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
),
configFactory.build(EC2AutoScalingStrategyConfig.class),
workerSetupManager
);
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopAutoScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
}
ResourceManagmentScheduler resourceManagmentScheduler = new ResourceManagmentScheduler(
taskQueue,
taskRunner,
new SimpleResourceManagementStrategy(
strategy,
configFactory.build(SimpleResourceManagmentConfig.class),
workerSetupManager
),
configFactory.build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec
);
lifecycle.addManagedInstance(resourceManagmentScheduler);
}
public static class Builder
{
private ObjectMapper jsonMapper = null;

View File

@ -43,4 +43,13 @@ public class AutoScalingData<T>
{
return nodes;
}
@Override
public String toString()
{
return "AutoScalingData{" +
"nodeIds=" + nodeIds +
", nodes=" + nodes +
'}';
}
}

View File

@ -22,8 +22,9 @@ package com.metamx.druid.merger.coordinator.scaling;
import java.util.List;
/**
* The AutoScalingStrategy has the actual methods to provision and terminate worker nodes.
*/
public interface ScalingStrategy<T>
public interface AutoScalingStrategy<T>
{
public AutoScalingData<T> provision();
@ -31,8 +32,8 @@ public interface ScalingStrategy<T>
/**
* Provides a lookup of ip addresses to node ids
* @param ips
* @return
* @param ips - nodes ips
* @return node ids
*/
public List<String> ipLookup(List<String> ips);
public List<String> ipToIdLookup(List<String> ips);
}

View File

@ -43,7 +43,7 @@ import java.util.List;
/**
*/
public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
{
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
@ -187,7 +187,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
}
@Override
public List<String> ipLookup(List<String> ips)
public List<String> ipToIdLookup(List<String> ips)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()

View File

@ -26,9 +26,9 @@ import java.util.List;
/**
* This class just logs when scaling should occur.
*/
public class NoopScalingStrategy implements ScalingStrategy<String>
public class NoopAutoScalingStrategy implements AutoScalingStrategy<String>
{
private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class);
private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class);
@Override
public AutoScalingData<String> provision()
@ -45,7 +45,7 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
}
@Override
public List<String> ipLookup(List<String> ips)
public List<String> ipToIdLookup(List<String> ips)
{
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
return ips;

View File

@ -0,0 +1,42 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class ResourceManagementSchedulerConfig
{
@Config("druid.indexer.provisionResources.duration")
@Default("PT1H")
public abstract Duration getProvisionResourcesDuration();
@Config("druid.indexer.terminateResources.duration")
@Default("PT1H")
public abstract Duration getTerminateResourcesDuration();
@Config("druid.indexer.terminateResources.originDateTime")
@Default("2012-01-01T00:55:00.000Z")
public abstract DateTime getTerminateResourcesOriginDateTime();
}

View File

@ -0,0 +1,38 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import java.util.Collection;
/**
* The ResourceManagementStrategy decides if worker nodes should be provisioned or determined
* based on the available tasks in the system and the state of the workers in the system.
*/
public interface ResourceManagementStrategy
{
public void doProvision(Collection<Task> availableTasks, Collection<WorkerWrapper> workerWrappers);
public void doTerminate(Collection<Task> availableTasks, Collection<WorkerWrapper> workerWrappers);
public ScalingStats getStats();
}

View File

@ -0,0 +1,138 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.TaskQueue;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
/**
* The ResourceManagmentScheduler manages when worker nodes should potentially be created or destroyed.
* It uses a {@link TaskQueue} to return the available tasks in the system and a {@link RemoteTaskRunner} to return
* the status of the worker nodes in the system.
* The ResourceManagmentScheduler does not contain the logic to decide whether provision or termination should actually occur.
* That decision is made in the {@link ResourceManagementStrategy}.
*/
public class ResourceManagmentScheduler
{
private static final Logger log = new Logger(ResourceManagmentScheduler.class);
private final TaskQueue taskQueue;
private final RemoteTaskRunner remoteTaskRunner;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ResourceManagementSchedulerConfig config;
private final ScheduledExecutorService exec;
private final Object lock = new Object();
private volatile boolean started = false;
public ResourceManagmentScheduler(
TaskQueue taskQueue,
RemoteTaskRunner remoteTaskRunner,
ResourceManagementStrategy resourceManagementStrategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorService exec
)
{
this.taskQueue = taskQueue;
this.remoteTaskRunner = remoteTaskRunner;
this.resourceManagementStrategy = resourceManagementStrategy;
this.config = config;
this.exec = exec;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getProvisionResourcesDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doProvision(
taskQueue.getAvailableTasks(),
remoteTaskRunner.getAvailableWorkers()
);
}
}
);
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesDuration());
PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(
System.currentTimeMillis(),
startTime
),
config.getTerminateResourcesDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doTerminate(
taskQueue.getAvailableTasks(),
remoteTaskRunner.getAvailableWorkers()
);
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
exec.shutdown();
}
}
public ScalingStats getStats()
{
return resourceManagementStrategy.getStats();
}
}

View File

@ -0,0 +1,88 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import java.util.List;
/**
*/
public class ScalingStats
{
private static enum EVENT
{
PROVISION,
TERMINATE
}
private final MinMaxPriorityQueue<ScalingEvent> recentNodes;
public ScalingStats(int capacity)
{
this.recentNodes = MinMaxPriorityQueue
.orderedBy(DateTimeComparator.getInstance())
.maximumSize(capacity)
.create();
}
public void addProvisionEvent(AutoScalingData data)
{
recentNodes.add(
new ScalingEvent(
data,
new DateTime(),
EVENT.PROVISION
)
);
}
public void addTerminateEvent(AutoScalingData data)
{
recentNodes.add(
new ScalingEvent(
data,
new DateTime(),
EVENT.TERMINATE
)
);
}
public List<ScalingEvent> toList()
{
List<ScalingEvent> retVal = Lists.newArrayList();
while (!recentNodes.isEmpty()) {
retVal.add(recentNodes.poll());
}
return retVal;
}
public static class ScalingEvent
{
private final AutoScalingData data;
private final DateTime timestamp;
private final EVENT event;
private ScalingEvent(
AutoScalingData data,
DateTime timestamp,
EVENT event
)
{
this.data = data;
this.timestamp = timestamp;
this.event = event;
}
@Override
public String toString()
{
return "ScalingEvent{" +
"data=" + data +
", timestamp=" + timestamp +
", event=" + event +
'}';
}
}
}

View File

@ -0,0 +1,222 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
public class SimpleResourceManagementStrategy implements ResourceManagementStrategy
{
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagmentConfig config;
private final WorkerSetupManager workerSetupManager;
private final ScalingStats scalingStats;
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
private volatile DateTime lastProvisionTime = new DateTime();
private volatile DateTime lastTerminateTime = new DateTime();
public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagmentConfig config,
WorkerSetupManager workerSetupManager
)
{
this.autoScalingStrategy = autoScalingStrategy;
this.config = config;
this.workerSetupManager = workerSetupManager;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
}
@Override
public void doProvision(Collection<Task> availableTasks, Collection<WorkerWrapper> workerWrappers)
{
boolean nothingProvisioning = Sets.difference(
currentlyProvisioning,
Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workerWrappers, new Function<WorkerWrapper, String>()
{
@Override
public String apply(WorkerWrapper input)
{
return input.getWorker().getIp();
}
}
)
)
)
)
).isEmpty();
boolean moreTasksThanWorkerCapacity = !Sets.difference(
Sets.newHashSet(availableTasks),
Sets.newHashSet(
Iterables.concat(
Iterables.transform(
workerWrappers,
new Function<WorkerWrapper, Set<String>>()
{
@Override
public Set<String> apply(WorkerWrapper input)
{
return input.getRunningTasks();
}
}
)
)
)
).isEmpty();
if (nothingProvisioning && moreTasksThanWorkerCapacity) {
AutoScalingData provisioned = autoScalingStrategy.provision();
if (provisioned != null) {
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned);
}
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node provisioning taking too long")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
}
log.info(
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning
);
}
}
@Override
public void doTerminate(Collection<Task> availableTasks, Collection<WorkerWrapper> workerWrappers)
{
boolean nothingTerminating = Sets.difference(
currentlyTerminating,
Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
workerWrappers, new Function<WorkerWrapper, String>()
{
@Override
public String apply(WorkerWrapper input)
{
return input.getWorker().getIp();
}
}
)
)
)
)
).isEmpty();
if (nothingTerminating) {
final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
if (workerWrappers.size() <= minNumWorkers) {
return;
}
List<WorkerWrapper> thoseLazyWorkers = Lists.newArrayList(
FunctionalIterable
.create(workerWrappers)
.filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime()
.getMillis()
> config.getMaxWorkerIdleTimeMillisBeforeDeletion();
}
}
)
);
AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
new Function<WorkerWrapper, String>()
{
@Override
public String apply(WorkerWrapper input)
{
return input.getWorker().getIp();
}
}
)
);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
scalingStats.addProvisionEvent(terminated);
}
} else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert("Worker node termination taking too long")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
}
log.info(
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
}
}
@Override
public ScalingStats getStats()
{
return scalingStats;
}
}

View File

@ -0,0 +1,41 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class SimpleResourceManagmentConfig
{
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
@Default("600000")
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration")
@Default("PT1H")
public abstract Duration getMaxScalingDuration();
@Config("druid.indexer.numEventsToTrack")
@Default("20")
public abstract int getNumEventsToTrack();
}

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.task.DefaultMergeTask;
import com.metamx.druid.merger.coordinator.TaskContext;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.annotate.JsonTypeName;
import java.util.List;
/**
*/
@JsonTypeName("test")
public class TestTask extends DefaultMergeTask
{
private final String id;
public TestTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(dataSource, segments, aggregators);
this.id = id;
}
@Override
@JsonProperty
public String getId()
{
return id;
}
@Override
public Type getType()
{
return Type.TEST;
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
return TaskStatus.success("task1");
}
}

View File

@ -7,6 +7,7 @@ import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@ -14,11 +15,10 @@ import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.DefaultMergeTask;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.TaskMonitor;
@ -326,7 +326,6 @@ public class RemoteTaskRunnerTest
pathChildrenCache,
scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()),
new TestScalingStrategy(),
workerSetupManager
);
@ -337,7 +336,7 @@ public class RemoteTaskRunnerTest
jsonMapper.writeValueAsBytes(worker1)
);
int count = 0;
while (remoteTaskRunner.getNumWorkers() == 0) {
while (remoteTaskRunner.getNumAvailableWorkers() == 0) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Still can't find worker!");
@ -367,27 +366,6 @@ public class RemoteTaskRunnerTest
}
}
private static class TestScalingStrategy<T> implements ScalingStrategy<T>
{
@Override
public AutoScalingData provision()
{
return null;
}
@Override
public AutoScalingData terminate(List<String> nodeIds)
{
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
return ips;
}
}
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
@ -444,41 +422,4 @@ public class RemoteTaskRunnerTest
return 1000;
}
}
@JsonTypeName("test")
private static class TestTask extends DefaultMergeTask
{
private final String id;
public TestTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
)
{
super(dataSource, segments, aggregators);
this.id = id;
}
@Override
@JsonProperty
public String getId()
{
return id;
}
@Override
public Type getType()
{
return Type.TEST;
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
return TaskStatus.success("task1");
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Set;
/**
*/
public class SimpleResourceManagementStrategyTest
{
private AutoScalingStrategy autoScalingStrategy;
private WorkerSetupManager workerSetupManager;
private Task testTask;
private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
@Before
public void setUp() throws Exception
{
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
testTask = new TestTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0
)
), Lists.<AggregatorFactory>newArrayList()
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
new TestAutoScalingStrategy(),
new SimpleResourceManagmentConfig()
{
@Override
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
{
return 0;
}
@Override
public Duration getMaxScalingDuration()
{
return null;
}
@Override
public int getNumEventsToTrack()
{
return 1;
}
},
workerSetupManager
);
}
@Test
public void testSuccessfulProvision() throws Exception
{
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList())
);
EasyMock.replay(autoScalingStrategy);
simpleResourceManagementStrategy.doProvision(
Arrays.<Task>asList(
testTask
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
)
);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testDoTerminate() throws Exception
{
}
private static class TestWorkerWrapper extends WorkerWrapper
{
private final Task testTask;
private TestWorkerWrapper(
Task testTask
)
{
super(null, null, null);
this.testTask = testTask;
}
@Override
public Set<String> getRunningTasks()
{
return Sets.newHashSet(testTask.getId());
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import java.util.List;
/**
*/
public class TestAutoScalingStrategy<T> implements AutoScalingStrategy<T>
{
@Override
public AutoScalingData<T> provision()
{
return null;
}
@Override
public AutoScalingData<T> terminate(List<String> ids)
{
return null;
}
@Override
public List<String> ipToIdLookup(List<String> ips)
{
return null;
}
}