improvements to the indexing service and auto scaling

This commit is contained in:
Fangjin Yang 2012-10-31 14:35:09 -07:00
parent fe9cb397a1
commit 11c64593ae
24 changed files with 1260 additions and 420 deletions

View File

@ -141,7 +141,7 @@ public class BrokerMain
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig.getZkHosts(),
serviceDiscoveryConfig,
lifecycle
);

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.initialization;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class CuratorConfig
{
@Config("druid.zk.service.host")
public abstract String getZkHosts();
@Config("druid.zk.service.sessionTimeoutMs")
@Default("15000")
public abstract int getZkSessionTimeoutMs();
}

View File

@ -161,14 +161,19 @@ public class Initialization
}
public static CuratorFramework makeCuratorFrameworkClient(
String zkHosts,
CuratorConfig curatorConfig,
Lifecycle lifecycle
) throws IOException
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(zkHosts)
.retryPolicy(new ExponentialBackoffRetry(1000, 30))
.connectString(curatorConfig.getZkHosts())
.retryPolicy(
new ExponentialBackoffRetry(
1000,
30
)
)
.build();
lifecycle.addHandler(

View File

@ -23,7 +23,7 @@ import org.skife.config.Config;
/**
*/
public abstract class ServiceDiscoveryConfig
public abstract class ServiceDiscoveryConfig extends CuratorConfig
{
@Config("druid.service")
public abstract String getServiceName();
@ -31,9 +31,6 @@ public abstract class ServiceDiscoveryConfig
@Config("druid.port")
public abstract int getPort();
@Config("druid.zk.service.host")
public abstract String getZkHosts();
@Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath();
}

View File

@ -20,17 +20,21 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
@ -39,15 +43,30 @@ import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers
* are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node
* changes in ZK.
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling
* strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote
* task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy.
* The remote task runner periodically runs a check to see if any worker nodes have not had any work for a
* specified period of time. If so, the worker node will be terminated.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
* that were associated with the node.
*/
public class RemoteTaskRunner implements TaskRunner
{
@ -55,156 +74,153 @@ public class RemoteTaskRunner implements TaskRunner
private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper;
private final TaskInventoryManager taskInventoryManager;
private final IndexerZkConfig config;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf;
private final PathChildrenCache workerListener;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final ConcurrentHashMap<String, WorkerWrapper> zkWorkers; // all workers that exist in ZK
private final ConcurrentHashMap<String, TaskWrapper> tasks; // all tasks that are assigned or need to be assigned
private final ScalingStrategy strategy;
private final ConcurrentHashMap<String, PathChildrenCache> monitors = new ConcurrentHashMap<String, PathChildrenCache>();
private final Object statusLock = new Object();
private volatile boolean started = false;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
TaskInventoryManager taskInventoryManager,
IndexerZkConfig config,
RemoteTaskRunnerConfig config,
CuratorFramework cf,
PathChildrenCache workerListener,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory
RetryPolicyFactory retryPolicyFactory,
ConcurrentHashMap<String, WorkerWrapper> zkWorkers,
ConcurrentHashMap<String, TaskWrapper> tasks,
ScalingStrategy strategy
)
{
this.jsonMapper = jsonMapper;
this.taskInventoryManager = taskInventoryManager;
this.config = config;
this.cf = cf;
this.workerListener = workerListener;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
this.zkWorkers = zkWorkers;
this.tasks = tasks;
this.strategy = strategy;
}
@LifecycleStart
public void start()
{
try {
workerListener.start();
workerListener.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Worker worker = jsonMapper.readValue(
cf.getData().forPath(event.getData().getPath()),
Worker.class
);
log.info("New worker[%s] found!", worker.getHost());
addWorker(worker);
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
// Get the worker host from the path
String workerHost = event.getData().getPath().substring(event.getData().getPath().lastIndexOf("/") + 1);
log.info("Worker[%s] removed!", workerHost);
removeWorker(workerHost);
}
}
}
);
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesPeriodMs());
PeriodGranularity granularity = new PeriodGranularity(period, null, null);
final long truncatedNow = granularity.truncate(new DateTime().getMillis());
ScheduledExecutors.scheduleAtFixedRate(
scheduledExec,
new Duration(
System.currentTimeMillis(),
granularity.next(truncatedNow) - config.getTerminateResourcesWindowMs()
),
new Duration(config.getTerminateResourcesPeriodMs()),
new Runnable()
{
@Override
public void run()
{
strategy.terminateIfNeeded(zkWorkers);
}
}
);
started = true;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@LifecycleStop
public void stop()
{
scheduledExec.shutdownNow();
try {
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
workerWrapper.getWatcher().close();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
started = false;
}
}
public boolean hasStarted()
{
return started;
}
@Override
public void run(final Task task, final TaskContext taskContext, final TaskCallback callback)
public void run(Task task, TaskContext context, TaskCallback callback)
{
run(task, taskContext, callback, retryPolicyFactory.makeRetryPolicy());
}
private void run(
final Task task,
final TaskContext taskContext,
final TaskCallback callback,
final RetryPolicy retryPolicy
)
{
try {
// If a worker is already running this task, check the status
Map<String, Worker> allRunningTasks = Maps.newHashMap();
for (Worker worker : taskInventoryManager.getInventory()) {
for (String taskId : worker.getTasks().keySet()) {
allRunningTasks.put(taskId, worker);
}
}
Worker workerRunningThisTask = allRunningTasks.get(task.getId());
if (workerRunningThisTask != null) {
// If the status is complete, just run the callback, otherwise monitor for the completion of the task
if (!verifyStatusComplete(jsonMapper, workerRunningThisTask, task, callback)) {
monitorStatus(jsonMapper, workerRunningThisTask, task, taskContext, callback, retryPolicy);
}
return;
}
// Run the task if it does not currently exist
Worker theWorker = getLeastCapacityWorker();
monitorStatus(jsonMapper, theWorker, task, taskContext, callback, retryPolicy);
announceTask(theWorker, task, taskContext);
}
catch (Exception e) {
log.error(e, "Failed to dispatch task. Retrying");
retryTask(task, taskContext, callback, retryPolicy);
}
}
private void retryTask(
final Task task,
final TaskContext taskContext,
final TaskCallback callback,
final RetryPolicy retryPolicy
)
{
if (retryPolicy.hasExceededRetryThreshold()) {
log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
.emit();
callback.notify(TaskStatus.failure(task.getId()));
return;
}
scheduledExec.schedule(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
retryPolicy.runRunnables();
log.info("Retry[%d] for task[%s]", retryPolicy.getNumRetries(), task.getId());
run(task, taskContext, callback, retryPolicy);
return null;
}
},
retryPolicy.getAndIncrementRetryDelay(),
TimeUnit.MILLISECONDS
assignTask(
new TaskWrapper(
task, context, callback, retryPolicyFactory.makeRetryPolicy()
)
);
}
private Worker getLeastCapacityWorker()
{
final MinMaxPriorityQueue<Worker> workerQueue = MinMaxPriorityQueue.<Worker>orderedBy(
new Comparator<Worker>()
{
@Override
public int compare(Worker w1, Worker w2)
{
return Ints.compare(w1.getTasks().size(), w2.getTasks().size());
}
}
).create(taskInventoryManager.getInventory());
if (workerQueue.isEmpty()) {
log.error("No worker nodes found!");
throw new RuntimeException();
}
return workerQueue.peek();
}
private boolean verifyStatusComplete(
final ObjectMapper jsonMapper,
final Worker worker,
final Task task,
final TaskCallback callback
)
private boolean assignTask(TaskWrapper taskWrapper)
{
// If the task already exists, we don't need to announce it
try {
final String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), task.getId());
final String statusPath = JOINER.join(config.getStatusPath(), worker.getHost(), task.getId());
WorkerWrapper workerWrapper;
if ((workerWrapper = findWorkerRunningTask(taskWrapper)) != null) {
final Worker worker = workerWrapper.getWorker();
TaskStatus taskStatus = jsonMapper.readValue(
cf.getData().forPath(JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())),
TaskStatus.class
);
TaskStatus taskStatus = jsonMapper.readValue(
cf.getData().forPath(statusPath), TaskStatus.class
);
if (taskStatus.isComplete()) {
if (callback != null) {
callback.notify(taskStatus);
if (taskStatus.isComplete()) {
TaskCallback callback = taskWrapper.getCallback();
if (callback != null) {
callback.notify(taskStatus);
}
new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()).run();
} else {
tasks.put(taskWrapper.getTask().getId(), taskWrapper);
}
cf.delete().guaranteed().forPath(statusPath);
cf.delete().guaranteed().forPath(taskPath);
return true;
}
}
@ -212,120 +228,291 @@ public class RemoteTaskRunner implements TaskRunner
throw Throwables.propagate(e);
}
// Announce the task
WorkerWrapper workerWrapper = getWorkerForTask();
if (workerWrapper != null) {
announceTask(workerWrapper.getWorker(), taskWrapper);
return true;
}
return false;
}
/**
* Creates a monitor for status updates and deletes. Worker nodes announce a status when they start a task and update
* it again upon completing the task. If a status is deleted, this means the worker node has died before completing
* its status update.
* Retries a task that has failed.
*
* @param pre - A runnable that is executed before the retry occurs
* @param taskWrapper - a container for task properties
*/
private void monitorStatus(
final ObjectMapper jsonMapper,
final Worker worker,
final Task task,
final TaskContext taskContext,
final TaskCallback callback,
final RetryPolicy retryPolicy
) throws Exception
private void retryTask(
final Runnable pre,
final TaskWrapper taskWrapper
)
{
final String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), task.getId());
final String statusPath = JOINER.join(config.getStatusPath(), worker.getHost(), task.getId());
final Task task = taskWrapper.getTask();
final RetryPolicy retryPolicy = taskWrapper.getRetryPolicy();
PathChildrenCache monitor = monitors.get(worker.getHost());
if (monitor == null) {
monitor = new PathChildrenCache(
cf,
JOINER.join(config.getStatusPath(), worker.getHost()),
false
);
monitor.start();
log.info("Registering retry for failed task[%s]", task.getId());
if (retryPolicy.hasExceededRetryThreshold()) {
log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
.emit();
return;
}
final PathChildrenCache statusMonitor = monitor;
statusMonitor.getListenable().addListener(
new PathChildrenCacheListener()
scheduledExec.schedule(
new Runnable()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
throws Exception
public void run()
{
try {
if (pathChildrenCacheEvent.getData().getPath().equals(statusPath)) {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
throw new ISE("Worker[%s] dropped Task[%s]!", worker.getHost(), task.getId());
}
if (pre != null) {
pre.run();
}
TaskStatus taskStatus = jsonMapper.readValue(
cf.getData().forPath(statusPath), TaskStatus.class
);
if (taskStatus.isComplete()) {
if (callback != null) {
callback.notify(taskStatus);
}
cf.delete().guaranteed().forPath(statusPath);
cf.delete().guaranteed().forPath(taskPath);
statusMonitor.close();
if (tasks.containsKey(task.getId())) {
log.info("Retry[%d] for task[%s]", retryPolicy.getNumRetries(), task.getId());
if (!assignTask(taskWrapper)) {
throw new ISE("Unable to find worker to send retry request to for task[%s]", task.getId());
}
}
}
catch (Exception e) {
log.error(e, "Exception while cleaning up task[%s]. Retrying", task.getId());
retryPolicy.registerRunnable(
new Runnable()
{
@Override
public void run()
{
try {
if (cf.checkExists().forPath(statusPath) != null) {
cf.delete().guaranteed().forPath(statusPath);
}
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
statusMonitor.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
retryTask(task, taskContext, callback, retryPolicy);
retryTask(null, taskWrapper);
}
}
}
},
retryPolicy.getAndIncrementRetryDelay(),
TimeUnit.MILLISECONDS
);
}
private void announceTask(Worker theWorker, Task task, TaskContext taskContext)
/**
* When a new worker appears, listeners are registered for status changes.
* Status changes indicate the creation or completion of task.
* The RemoteTaskRunner updates state according to these changes.
*
* @param worker - contains metadata for a worker that has appeared in ZK
*/
private void addWorker(final Worker worker)
{
try {
log.info(
"Coordinator asking Worker[%s] to add"
+ " task[%s]", theWorker.getHost(), task.getId()
final String workerStatus = JOINER.join(config.getStatusPath(), worker.getHost());
final ConcurrentSkipListSet<String> runningTasks = new ConcurrentSkipListSet<String>(
cf.getChildren().forPath(workerStatus)
);
final PathChildrenCache watcher = new PathChildrenCache(cf, workerStatus, false);
final WorkerWrapper workerWrapper = new WorkerWrapper(
worker,
runningTasks,
watcher
);
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(
JOINER.join(
config.getTaskPath(),
theWorker.getHost(),
task.getId()
),
jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
);
// Add status listener to the watcher for status changes
watcher.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
synchronized (statusLock) {
String taskId = null;
try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
String statusPath = event.getData().getPath();
TaskStatus taskStatus = jsonMapper.readValue(
cf.getData().forPath(statusPath), TaskStatus.class
);
taskId = taskStatus.getId();
log.info("New status[%s] appeared!", taskId);
runningTasks.add(taskId);
statusLock.notify();
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
String statusPath = event.getData().getPath();
TaskStatus taskStatus = jsonMapper.readValue(
cf.getData().forPath(statusPath), TaskStatus.class
);
taskId = taskStatus.getId();
log.info("Task[%s] updated status[%s]!", taskId, taskStatus.getStatusCode());
if (taskStatus.isComplete()) {
workerWrapper.setLastCompletedTaskTime(new DateTime());
TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper == null) {
log.warn("A task completed that I didn't know about? WTF?!");
} else {
TaskCallback callback = taskWrapper.getCallback();
// Cleanup
if (callback != null) {
callback.notify(taskStatus);
}
tasks.remove(taskId);
runningTasks.remove(taskId);
cf.delete().guaranteed().forPath(statusPath);
}
}
}
}
catch (Exception e) {
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
}
}
}
}
);
zkWorkers.put(worker.getHost(), workerWrapper);
watcher.start();
}
catch (Exception e) {
log.error(e, "Exception creating task[%s] for worker node[%s]", task.getId(), theWorker.getHost());
throw Throwables.propagate(e);
}
}
private WorkerWrapper findWorkerRunningTask(TaskWrapper taskWrapper)
{
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
if (workerWrapper.getRunningTasks().contains(taskWrapper.getTask().getId())) {
return workerWrapper;
}
}
return null;
}
/**
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
* to the worker. If tasks remain, they are retried.
*
* @param workerId - id of the removed worker
*/
private void removeWorker(final String workerId)
{
WorkerWrapper workerWrapper = zkWorkers.get(workerId);
if (workerWrapper != null) {
for (String taskId : workerWrapper.getRunningTasks()) {
TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper != null) {
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
}
workerWrapper.removeTask(taskId);
}
try {
workerWrapper.getWatcher().close();
}
catch (IOException e) {
log.error("Failed to close watcher associated with worker[%s]", workerWrapper.getWorker().getHost());
}
}
zkWorkers.remove(workerId);
}
private WorkerWrapper getWorkerForTask()
{
try {
final MinMaxPriorityQueue<WorkerWrapper> workerQueue = MinMaxPriorityQueue.<WorkerWrapper>orderedBy(
new Comparator<WorkerWrapper>()
{
@Override
public int compare(WorkerWrapper w1, WorkerWrapper w2)
{
return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size());
}
}
).create(
FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return (!input.isAtCapacity() &&
input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0);
}
}
)
);
if (workerQueue.isEmpty()) {
log.makeAlert("There are no worker nodes with capacity to run task!").emit();
strategy.provisionIfNeeded(zkWorkers);
return null;
}
return workerQueue.peek();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
* removing the task ZK entry and creating a task status ZK entry.
*
* @param theWorker The worker the task is assigned to
* @param taskWrapper The task to be assigned
*/
private void announceTask(Worker theWorker, TaskWrapper taskWrapper)
{
synchronized (statusLock) {
final Task task = taskWrapper.getTask();
final TaskContext taskContext = taskWrapper.getTaskContext();
try {
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
tasks.put(task.getId(), taskWrapper);
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(
JOINER.join(
config.getTaskPath(),
theWorker.getHost(),
task.getId()
),
jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
);
while (findWorkerRunningTask(taskWrapper) == null) {
statusLock.wait();
}
}
catch (Exception e) {
log.error(e, "Exception creating task[%s] for worker node[%s]", task.getId(), theWorker.getHost());
throw Throwables.propagate(e);
}
}
}
private class CleanupPaths implements Runnable
{
private final String workerId;
private final String taskId;
private CleanupPaths(String workerId, String taskId)
{
this.workerId = workerId;
this.taskId = taskId;
}
@Override
public void run()
{
try {
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
final String taskPath = JOINER.join(config.getTaskPath(), workerId, taskId);
cf.delete().guaranteed().forPath(statusPath);
cf.delete().guaranteed().forPath(taskPath);
}
catch (Exception e) {
log.warn("Tried to delete a path that didn't exist! Must've gone away already!");
}
}
}
}

View File

@ -48,23 +48,6 @@ public class RetryPolicy
this.retryCount = 0;
}
/**
* Register runnables that can be run at any point in a given retry.
* @param runnable
*/
public void registerRunnable(Runnable runnable)
{
runnables.add(runnable);
}
public void runRunnables()
{
for (Runnable runnable : runnables) {
runnable.run();
}
runnables.clear();
}
public long getAndIncrementRetryDelay()
{
long retVal = currRetryDelay;

View File

@ -1,125 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.InventoryManagementStrategy;
import com.metamx.druid.client.InventoryManager;
import com.metamx.druid.client.InventoryManagerConfig;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.phonebook.PhoneBook;
import com.metamx.phonebook.PhoneBookPeon;
import java.util.Map;
/**
* A simple {@link InventoryManager} that monitors ZK for the creation and deletion of new Workers and the
* tasks each worker is assigned.
*/
public class TaskInventoryManager extends InventoryManager<Worker>
{
public TaskInventoryManager(
IndexerZkConfig config,
PhoneBook yp
)
{
super(
new Logger(TaskInventoryManager.class.getName() + "." + config.getStatusPath()),
new InventoryManagerConfig(
config.getAnnouncementPath(),
config.getStatusPath()
),
yp,
new WorkerInventoryManagementStrategy(
new Logger(
TaskInventoryManager.class.getName() + "." + config.getStatusPath()
)
)
);
}
private static class WorkerInventoryManagementStrategy implements InventoryManagementStrategy<Worker>
{
private final Logger log;
public WorkerInventoryManagementStrategy(
Logger log
)
{
this.log = log;
}
@Override
public Class<Worker> getContainerClass()
{
return Worker.class;
}
@Override
public Pair<String, PhoneBookPeon<?>> makeSubListener(final Worker worker)
{
return new Pair<String, PhoneBookPeon<?>>(
worker.getHost(),
new PhoneBookPeon<TaskStatus>()
{
@Override
public Class<TaskStatus> getObjectClazz()
{
return TaskStatus.class;
}
@Override
public void newEntry(String name, TaskStatus taskStatus)
{
worker.addTask(taskStatus);
log.info("Worker[%s] has new task[%s] in ZK", worker.getHost(), taskStatus.getId());
}
@Override
public void entryRemoved(String taskId)
{
worker.removeTask(taskId);
log.info("Worker[%s] removed task[%s] in ZK", worker.getHost(), taskId);
}
}
);
}
@Override
public void objectRemoved(Worker baseObject)
{
}
@Override
public boolean doesSerde()
{
return false;
}
@Override
public Worker deserialize(String name, Map<String, String> properties)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.common.task.Task;
/**
*/
public class TaskWrapper
{
private final Task task;
private final TaskContext taskContext;
private final TaskCallback callback;
private final RetryPolicy retryPolicy;
public TaskWrapper(Task task, TaskContext taskContext, TaskCallback callback, RetryPolicy retryPolicy)
{
this.task = task;
this.taskContext = taskContext;
this.callback = callback;
this.retryPolicy = retryPolicy;
}
public Task getTask()
{
return task;
}
public TaskContext getTaskContext()
{
return taskContext;
}
public TaskCallback getCallback()
{
return callback;
}
public RetryPolicy getRetryPolicy()
{
return retryPolicy;
}
}

View File

@ -0,0 +1,80 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.worker.Worker;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.joda.time.DateTime;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
public class WorkerWrapper
{
private final Worker worker;
private final ConcurrentSkipListSet<String> runningTasks;
private final PathChildrenCache watcher;
private volatile DateTime lastCompletedTaskTime;
public WorkerWrapper(Worker worker, ConcurrentSkipListSet<String> runningTasks, PathChildrenCache watcher)
{
this.worker = worker;
this.runningTasks = runningTasks;
this.watcher = watcher;
}
public Worker getWorker()
{
return worker;
}
public Set<String> getRunningTasks()
{
return runningTasks;
}
public PathChildrenCache getWatcher()
{
return watcher;
}
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime;
}
public boolean isAtCapacity()
{
return runningTasks.size() >= worker.getCapacity();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime = completedTaskTime;
}
public void removeTask(String taskId)
{
runningTasks.remove(taskId);
}
}

View File

@ -0,0 +1,40 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.config;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
{
@Config("druid.indexer.terminateResources.periodMs")
@Default("3600000") // 1 hr
public abstract long getTerminateResourcesPeriodMs();
@Config("druid.indexer.terminateResources.windowMs")
@Default("300000") // 5 mins
public abstract long getTerminateResourcesWindowMs();
@Config("druid.indexer.minWorkerVersion")
public abstract String getMinWorkerVersion();
}

View File

@ -27,11 +27,11 @@ import org.skife.config.Default;
public abstract class RetryPolicyConfig
{
@Config("druid.indexer.retry.minWaitMillis")
@Default("10000")
@Default("60000") // 1 minute
public abstract long getRetryMinMillis();
@Config("druid.indexer.retry.maxWaitMillis")
@Default("60000")
@Default("600000") // 10 minutes
public abstract long getRetryMaxMillis();
@Config("druid.indexer.retry.maxRetryCount")

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.config;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class S3AutoScalingStrategyConfig
{
@Config("druid.indexer.amiId")
public abstract String getAmiId();
@Config("druid.indexer.worker.port")
@Default("8080")
public abstract String getWorkerPort();
@Config("druid.indexer.instanceType")
public abstract String getInstanceType();
@Config("druid.indexer.millisToWaitBeforeTerminating")
@Default("1800000") // 30 mins
public abstract long getMillisToWaitBeforeTerminating();
// minimum number of workers that must always be running
@Config("druid.indexer.minNumWorkers")
@Default("1")
public abstract int getMinNuMWorkers();
}

View File

@ -19,6 +19,8 @@
package com.metamx.druid.merger.coordinator.http;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@ -42,7 +44,6 @@ import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
@ -53,15 +54,21 @@ import com.metamx.druid.merger.coordinator.LocalTaskStorage;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.RetryPolicyFactory;
import com.metamx.druid.merger.coordinator.TaskInventoryManager;
import com.metamx.druid.merger.coordinator.TaskMaster;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
import com.metamx.druid.merger.coordinator.TaskStorage;
import com.metamx.druid.merger.coordinator.TaskWrapper;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.S3AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.realtime.S3SegmentPusher;
import com.metamx.druid.realtime.S3SegmentPusherConfig;
import com.metamx.druid.realtime.SegmentPusher;
@ -78,9 +85,8 @@ import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.metamx.phonebook.PhoneBook;
import com.netflix.curator.framework.CuratorFramework;
import org.I0Itec.zkclient.ZkClient;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.codehaus.jackson.map.InjectableValues;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.S3ServiceException;
@ -96,6 +102,7 @@ import org.skife.config.ConfigurationObjectFactory;
import java.net.URL;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -126,7 +133,6 @@ public class IndexerCoordinatorNode
private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig;
private TaskInventoryManager taskInventoryManager;
private TaskRunnerFactory taskRunnerFactory = null;
private TaskMaster taskMaster = null;
private Server server = null;
@ -194,7 +200,6 @@ public class IndexerCoordinatorNode
initializeJacksonSubtypes();
initializeCurator();
initializeIndexerZkConfig();
initializeTaskInventoryManager();
initializeTaskRunnerFactory();
initializeTaskMaster();
initializeServer();
@ -265,7 +270,7 @@ public class IndexerCoordinatorNode
private void initializeTaskMaster()
{
if(taskMaster == null) {
if (taskMaster == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
taskMaster = new TaskMaster(
taskQueue,
@ -417,7 +422,7 @@ public class IndexerCoordinatorNode
if (curatorFramework == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig.getZkHosts(),
serviceDiscoveryConfig,
lifecycle
);
}
@ -430,28 +435,10 @@ public class IndexerCoordinatorNode
}
}
public void initializeTaskInventoryManager()
{
if (taskInventoryManager == null) {
final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
final PhoneBook masterYp = Initialization.createYellowPages(
jsonMapper,
zkClient,
"Master-ZKYP--%s",
lifecycle
);
taskInventoryManager = new TaskInventoryManager(
indexerZkConfig,
masterYp
);
lifecycle.addManagedInstance(taskInventoryManager);
}
}
public void initializeTaskStorage()
{
if (taskStorage == null) {
if(config.getStorageImpl().equals("local")) {
if (config.getStorageImpl().equals("local")) {
taskStorage = new LocalTaskStorage();
} else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
@ -481,13 +468,28 @@ public class IndexerCoordinatorNode
.build()
);
ScalingStrategy strategy = new S3AutoScalingStrategy(
new AmazonEC2Client(
new BasicAWSCredentials(
props.getProperty("com.metamx.aws.accessKey"),
props.getProperty("com.metamx.aws.secretKey")
)
),
configFactory.build(S3AutoScalingStrategyConfig.class)
);
// TODO: remove this when AMI is ready
strategy = new NoopScalingStrategy(configFactory.build(S3AutoScalingStrategyConfig.class));
return new RemoteTaskRunner(
jsonMapper,
taskInventoryManager,
indexerZkConfig,
configFactory.build(RemoteTaskRunnerConfig.class),
curatorFramework,
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), false),
retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class))
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
new ConcurrentHashMap<String, WorkerWrapper>(),
new ConcurrentHashMap<String, TaskWrapper>(),
strategy
);
}
};

View File

@ -0,0 +1,124 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.amazonaws.services.ec2.model.Instance;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
import com.metamx.emitter.EmittingLogger;
import java.util.Comparator;
import java.util.Map;
/**
* This class just logs when scaling should occur.
*/
public class NoopScalingStrategy implements ScalingStrategy
{
private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class);
private final S3AutoScalingStrategyConfig config;
private final Object lock = new Object();
private volatile String currentlyProvisioning = null;
private volatile String currentlyTerminating = null;
public NoopScalingStrategy(
S3AutoScalingStrategyConfig config
)
{
this.config = config;
}
@Override
public void provisionIfNeeded(Map<String, WorkerWrapper> zkWorkers)
{
synchronized (lock) {
if (currentlyProvisioning != null) {
if (!zkWorkers.containsKey(currentlyProvisioning)) {
log.info(
"[%s] is still provisioning. Wait for it to finish before requesting new worker.",
currentlyProvisioning
);
return;
}
}
Iterable<WorkerWrapper> availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return !input.isAtCapacity();
}
}
);
if (Iterables.size(availableWorkers) == 0) {
try {
log.info("If I were a real strategy I'd create something now");
currentlyProvisioning = "willNeverBeTrue";
}
catch (Exception e) {
log.error(e, "Unable to create instance");
currentlyProvisioning = null;
}
}
}
}
@Override
public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers)
{
synchronized (lock) {
if (currentlyTerminating != null) {
if (zkWorkers.containsKey(currentlyTerminating)) {
log.info("[%s] has not terminated. Wait for it to finish before terminating again.", currentlyTerminating);
return null;
}
}
MinMaxPriorityQueue<WorkerWrapper> currWorkers = MinMaxPriorityQueue.orderedBy(
new Comparator<WorkerWrapper>()
{
@Override
public int compare(WorkerWrapper w1, WorkerWrapper w2)
{
return Ordering.natural()
.nullsFirst()
.compare(w1.getLastCompletedTaskTime(), w2.getLastCompletedTaskTime());
}
}
).create(
zkWorkers.values()
);
if (currWorkers.size() <= config.getMinNuMWorkers()) {
return null;
}
WorkerWrapper thatLazyWorker = currWorkers.poll();
if (System.currentTimeMillis() - thatLazyWorker.getLastCompletedTaskTime().getMillis()
> config.getMillisToWaitBeforeTerminating()) {
try {
log.info("If I were a real strategy I'd terminate something now");
currentlyTerminating = "willNeverBeTrue";
return null;
}
catch (Exception e) {
log.error(e, "Unable to terminate instance");
currentlyTerminating = null;
}
}
return null;
}
}
}

View File

@ -0,0 +1,186 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Filter;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
import com.metamx.emitter.EmittingLogger;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
/**
*/
public class S3AutoScalingStrategy implements ScalingStrategy
{
private static final EmittingLogger log = new EmittingLogger(S3AutoScalingStrategy.class);
private final AmazonEC2Client amazonEC2Client;
private final S3AutoScalingStrategyConfig config;
private final Object lock = new Object();
private volatile String currentlyProvisioning = null;
private volatile String currentlyTerminating = null;
public S3AutoScalingStrategy(
AmazonEC2Client amazonEC2Client,
S3AutoScalingStrategyConfig config
)
{
this.amazonEC2Client = amazonEC2Client;
this.config = config;
}
@Override
public void provisionIfNeeded(Map<String, WorkerWrapper> zkWorkers)
{
synchronized (lock) {
if (zkWorkers.containsKey(currentlyProvisioning)) {
currentlyProvisioning = null;
}
if (currentlyProvisioning != null) {
log.info(
"[%s] is still provisioning. Wait for it to finish before requesting new worker.",
currentlyProvisioning
);
return;
}
Iterable<WorkerWrapper> availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return !input.isAtCapacity();
}
}
);
if (Iterables.size(availableWorkers) == 0) {
try {
log.info("Creating a new instance");
RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest(config.getAmiId(), 1, 1)
.withInstanceType(InstanceType.fromValue(config.getInstanceType()))
);
if (result.getReservation().getInstances().size() != 1) {
throw new ISE("Created more than one instance, WTF?!");
}
Instance instance = result.getReservation().getInstances().get(0);
log.info("Created instance: %s", instance.getInstanceId());
log.debug("%s", instance);
currentlyProvisioning = String.format("%s:%s", instance.getPrivateIpAddress(), config.getWorkerPort());
}
catch (Exception e) {
log.error(e, "Unable to create instance");
currentlyProvisioning = null;
}
}
}
}
@Override
public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers)
{
synchronized (lock) {
if (!zkWorkers.containsKey(currentlyTerminating)) {
currentlyProvisioning = null;
}
if (currentlyTerminating != null) {
log.info("[%s] has not terminated. Wait for it to finish before terminating again.", currentlyTerminating);
return null;
}
MinMaxPriorityQueue<WorkerWrapper> currWorkers = MinMaxPriorityQueue.orderedBy(
new Comparator<WorkerWrapper>()
{
@Override
public int compare(WorkerWrapper w1, WorkerWrapper w2)
{
return w1.getLastCompletedTaskTime().compareTo(w2.getLastCompletedTaskTime());
}
}
).create(
zkWorkers.values()
);
if (currWorkers.size() <= config.getMinNuMWorkers()) {
return null;
}
WorkerWrapper thatLazyWorker = currWorkers.poll();
if (System.currentTimeMillis() - thatLazyWorker.getLastCompletedTaskTime().getMillis()
> config.getMillisToWaitBeforeTerminating()) {
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", Arrays.asList(thatLazyWorker.getWorker().getIp()))
)
);
if (result.getReservations().size() != 1 || result.getReservations().get(0).getInstances().size() != 1) {
throw new ISE("More than one node with the same private IP[%s], WTF?!", thatLazyWorker.getWorker().getIp());
}
Instance instance = result.getReservations().get(0).getInstances().get(0);
try {
log.info("Terminating instance[%s]", instance.getInstanceId());
amazonEC2Client.terminateInstances(
new TerminateInstancesRequest(Arrays.asList(instance.getInstanceId()))
);
currentlyTerminating = String.format("%s:%s", instance.getPrivateIpAddress(), config.getWorkerPort());
return instance;
}
catch (Exception e) {
log.error(e, "Unable to terminate instance");
currentlyTerminating = null;
}
}
return null;
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.amazonaws.services.ec2.model.Instance;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import java.util.Map;
/**
*/
public interface ScalingStrategy
{
public void provisionIfNeeded(Map<String, WorkerWrapper> zkWorkers);
public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers);
}

View File

@ -111,6 +111,7 @@ public class TaskMonitor
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
taskStatus = task.run(taskContext, toolbox);
}
@ -165,6 +166,7 @@ public class TaskMonitor
{
try {
pathChildrenCache.close();
exec.shutdown();
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping TaskMonitor")

View File

@ -19,42 +19,44 @@
package com.metamx.druid.merger.worker;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A container for worker metadata.
*/
public class Worker
{
private static final Logger log = new Logger(Worker.class);
private final String host;
private final ConcurrentHashMap<String, TaskStatus> runningTasks;
private final String ip;
private final int capacity;
private final String version;
public Worker(
WorkerConfig config
)
{
this(
config.getHost()
config.getHost(),
config.getIp(),
config.getCapacity(),
config.getVersion()
);
}
@JsonCreator
public Worker(
@JsonProperty("host") String host
@JsonProperty("host") String host,
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version
)
{
this.host = host;
this.runningTasks = new ConcurrentHashMap<String, TaskStatus>();
this.ip = ip;
this.capacity = capacity;
this.version = version;
}
@JsonProperty
@ -63,25 +65,21 @@ public class Worker
return host;
}
public Map<String, TaskStatus> getTasks()
@JsonProperty
public String getIp()
{
return runningTasks;
return ip;
}
public Map<String, String> getStringProps()
@JsonProperty
public int getCapacity()
{
return ImmutableMap.<String, String>of(
"host", host
);
return capacity;
}
public TaskStatus addTask(TaskStatus status)
@JsonProperty
public String getVersion()
{
return runningTasks.put(status.getId(), status);
}
public TaskStatus removeTask(String taskId)
{
return runningTasks.remove(taskId);
return version;
}
}

View File

@ -92,7 +92,7 @@ public class WorkerCuratorCoordinator
makePathIfNotExisting(
getAnnouncementsPathForWorker(),
CreateMode.EPHEMERAL,
worker.getStringProps()
worker
);
started = true;
@ -171,6 +171,16 @@ public class WorkerCuratorCoordinator
}
}
public void unannounceTask(String taskId)
{
try {
curatorFramework.delete().guaranteed().forPath(getTaskPathForId(taskId));
}
catch (Exception e) {
log.warn("Could not delete task path for task[%s], looks like it already went away", taskId);
}
}
public void announceStatus(TaskStatus status)
{
synchronized (lock) {

View File

@ -32,4 +32,14 @@ public abstract class WorkerConfig
@Config("druid.host")
public abstract String getHost();
@Config("druid.worker.ip")
public abstract String getIp();
@Config("druid.worker.version")
public abstract String getVersion();
@Config("druid.worker.capacity")
@Default("3")
public abstract int getCapacity();
}

View File

@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
@ -294,8 +295,9 @@ public class WorkerNode
public void initializeCuratorFramework() throws IOException
{
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
props.getProperty("druid.zk.service.host"),
curatorConfig,
lifecycle
);
}

View File

@ -0,0 +1,164 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Maps;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
import com.metamx.druid.merger.worker.Worker;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
public class S3AutoScalingStrategyTest
{
private static final String AMI_ID = "dummy";
private static final String INSTANCE_ID = "theInstance";
private AmazonEC2Client amazonEC2Client;
private RunInstancesResult runInstancesResult;
private DescribeInstancesResult describeInstancesResult;
private Reservation reservation;
private Instance instance;
private WorkerWrapper worker;
private S3AutoScalingStrategy strategy;
@Before
public void setUp() throws Exception
{
amazonEC2Client = EasyMock.createMock(AmazonEC2Client.class);
runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
reservation = EasyMock.createMock(Reservation.class);
instance = new Instance().withInstanceId(INSTANCE_ID).withLaunchTime(new Date()).withImageId(AMI_ID);
worker = new WorkerWrapper(
new Worker("dummyHost", "dummyIP", 2, "0"),
new ConcurrentSkipListSet<String>(),
null
);
worker.setLastCompletedTaskTime(new DateTime(0));
strategy = new S3AutoScalingStrategy(
amazonEC2Client, new S3AutoScalingStrategyConfig()
{
@Override
public String getAmiId()
{
return AMI_ID;
}
@Override
public String getWorkerPort()
{
return "8080";
}
@Override
public String getInstanceType()
{
return "t1.micro";
}
@Override
public long getMillisToWaitBeforeTerminating()
{
return 0;
}
@Override
public int getMinNuMWorkers()
{
return 0;
}
}
);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(amazonEC2Client);
EasyMock.verify(runInstancesResult);
EasyMock.verify(describeInstancesResult);
EasyMock.verify(reservation);
}
@Test
public void testScale()
{
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
runInstancesResult
);
EasyMock.expect(amazonEC2Client.describeInstances(EasyMock.anyObject(DescribeInstancesRequest.class)))
.andReturn(describeInstancesResult);
EasyMock.expect(amazonEC2Client.terminateInstances(EasyMock.anyObject(TerminateInstancesRequest.class)))
.andReturn(null);
EasyMock.replay(amazonEC2Client);
EasyMock.expect(runInstancesResult.getReservation()).andReturn(reservation).atLeastOnce();
EasyMock.replay(runInstancesResult);
EasyMock.expect(describeInstancesResult.getReservations()).andReturn(Arrays.asList(reservation)).atLeastOnce();
EasyMock.replay(describeInstancesResult);
EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
EasyMock.replay(reservation);
Map<String, WorkerWrapper> zkWorkers = Maps.newHashMap();
zkWorkers.put(worker.getWorker().getHost(), worker);
worker.getRunningTasks().add("task1");
Assert.assertFalse(worker.isAtCapacity());
worker.getRunningTasks().add("task2");
Assert.assertTrue(worker.isAtCapacity());
strategy.provisionIfNeeded(zkWorkers);
worker.getRunningTasks().remove("task1");
worker.getRunningTasks().remove("task2");
Instance deleted = strategy.terminateIfNeeded(zkWorkers);
Assert.assertEquals(deleted.getInstanceId(), INSTANCE_ID);
}
}

View File

@ -26,7 +26,6 @@ import org.skife.config.Default;
*/
public abstract class S3SegmentPusherConfig
{
@Config("druid.pusher.s3.bucket")
public abstract String getBucket();

View File

@ -155,13 +155,13 @@ public class MasterMain
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
serviceDiscoveryConfig.getZkHosts(),
serviceDiscoveryConfig,
lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
configFactory.build(ServiceDiscoveryConfig.class),
serviceDiscoveryConfig,
lifecycle
);