diff --git a/client/src/main/java/com/metamx/druid/http/BrokerMain.java b/client/src/main/java/com/metamx/druid/http/BrokerMain.java
index 0386685c94a..e46d902cf5a 100644
--- a/client/src/main/java/com/metamx/druid/http/BrokerMain.java
+++ b/client/src/main/java/com/metamx/druid/http/BrokerMain.java
@@ -141,7 +141,7 @@ public class BrokerMain
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
- serviceDiscoveryConfig.getZkHosts(),
+ serviceDiscoveryConfig,
lifecycle
);
diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
new file mode 100644
index 00000000000..8b83b3c8cf9
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.initialization;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class CuratorConfig
+{
+ @Config("druid.zk.service.host")
+ public abstract String getZkHosts();
+
+ @Config("druid.zk.service.sessionTimeoutMs")
+ @Default("15000")
+ public abstract int getZkSessionTimeoutMs();
+}
diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
index f14cfecebf8..227791b6a86 100644
--- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java
+++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
@@ -161,14 +161,19 @@ public class Initialization
}
public static CuratorFramework makeCuratorFrameworkClient(
- String zkHosts,
+ CuratorConfig curatorConfig,
Lifecycle lifecycle
) throws IOException
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
- .connectString(zkHosts)
- .retryPolicy(new ExponentialBackoffRetry(1000, 30))
+ .connectString(curatorConfig.getZkHosts())
+ .retryPolicy(
+ new ExponentialBackoffRetry(
+ 1000,
+ 30
+ )
+ )
.build();
lifecycle.addHandler(
diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
index e5a97bffdc6..62cbfe44eb9 100644
--- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
+++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
@@ -23,7 +23,7 @@ import org.skife.config.Config;
/**
*/
-public abstract class ServiceDiscoveryConfig
+public abstract class ServiceDiscoveryConfig extends CuratorConfig
{
@Config("druid.service")
public abstract String getServiceName();
@@ -31,9 +31,6 @@ public abstract class ServiceDiscoveryConfig
@Config("druid.port")
public abstract int getPort();
- @Config("druid.zk.service.host")
- public abstract String getZkHosts();
-
@Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath();
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
index eaf000e5276..5ed630e607b 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
@@ -20,17 +20,21 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
+import com.metamx.common.concurrent.ScheduledExecutors;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
+import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
-import com.metamx.druid.merger.common.TaskToolbox;
-import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.task.Task;
+import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
+import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
@@ -39,15 +43,30 @@ import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import java.io.IOException;
import java.util.Comparator;
-import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
+ * The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers
+ * are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node
+ * changes in ZK.
+ *
+ * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling
+ * strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote
+ * task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy.
+ * The remote task runner periodically runs a check to see if any worker nodes have not had any work for a
+ * specified period of time. If so, the worker node will be terminated.
+ *
+ * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
+ * that were associated with the node.
*/
public class RemoteTaskRunner implements TaskRunner
{
@@ -55,156 +74,153 @@ public class RemoteTaskRunner implements TaskRunner
private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper;
- private final TaskInventoryManager taskInventoryManager;
- private final IndexerZkConfig config;
+ private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf;
+ private final PathChildrenCache workerListener;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
+ private final ConcurrentHashMap zkWorkers; // all workers that exist in ZK
+ private final ConcurrentHashMap tasks; // all tasks that are assigned or need to be assigned
+ private final ScalingStrategy strategy;
- private final ConcurrentHashMap monitors = new ConcurrentHashMap();
+ private final Object statusLock = new Object();
+
+ private volatile boolean started = false;
public RemoteTaskRunner(
ObjectMapper jsonMapper,
- TaskInventoryManager taskInventoryManager,
- IndexerZkConfig config,
+ RemoteTaskRunnerConfig config,
CuratorFramework cf,
+ PathChildrenCache workerListener,
ScheduledExecutorService scheduledExec,
- RetryPolicyFactory retryPolicyFactory
+ RetryPolicyFactory retryPolicyFactory,
+ ConcurrentHashMap zkWorkers,
+ ConcurrentHashMap tasks,
+ ScalingStrategy strategy
)
{
this.jsonMapper = jsonMapper;
- this.taskInventoryManager = taskInventoryManager;
this.config = config;
this.cf = cf;
+ this.workerListener = workerListener;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
+ this.zkWorkers = zkWorkers;
+ this.tasks = tasks;
+ this.strategy = strategy;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ try {
+ workerListener.start();
+ workerListener.getListenable().addListener(
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
+ {
+ if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ final Worker worker = jsonMapper.readValue(
+ cf.getData().forPath(event.getData().getPath()),
+ Worker.class
+ );
+
+ log.info("New worker[%s] found!", worker.getHost());
+ addWorker(worker);
+ } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
+ // Get the worker host from the path
+ String workerHost = event.getData().getPath().substring(event.getData().getPath().lastIndexOf("/") + 1);
+
+ log.info("Worker[%s] removed!", workerHost);
+ removeWorker(workerHost);
+ }
+ }
+ }
+ );
+
+ // Schedule termination of worker nodes periodically
+ Period period = new Period(config.getTerminateResourcesPeriodMs());
+ PeriodGranularity granularity = new PeriodGranularity(period, null, null);
+ final long truncatedNow = granularity.truncate(new DateTime().getMillis());
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ scheduledExec,
+ new Duration(
+ System.currentTimeMillis(),
+ granularity.next(truncatedNow) - config.getTerminateResourcesWindowMs()
+ ),
+ new Duration(config.getTerminateResourcesPeriodMs()),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ strategy.terminateIfNeeded(zkWorkers);
+ }
+ }
+ );
+
+ started = true;
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
@LifecycleStop
public void stop()
{
- scheduledExec.shutdownNow();
+ try {
+ for (WorkerWrapper workerWrapper : zkWorkers.values()) {
+ workerWrapper.getWatcher().close();
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ finally {
+ started = false;
+ }
+ }
+
+ public boolean hasStarted()
+ {
+ return started;
}
@Override
- public void run(final Task task, final TaskContext taskContext, final TaskCallback callback)
+ public void run(Task task, TaskContext context, TaskCallback callback)
{
- run(task, taskContext, callback, retryPolicyFactory.makeRetryPolicy());
- }
-
- private void run(
- final Task task,
- final TaskContext taskContext,
- final TaskCallback callback,
- final RetryPolicy retryPolicy
- )
- {
- try {
- // If a worker is already running this task, check the status
- Map allRunningTasks = Maps.newHashMap();
- for (Worker worker : taskInventoryManager.getInventory()) {
- for (String taskId : worker.getTasks().keySet()) {
- allRunningTasks.put(taskId, worker);
- }
- }
-
- Worker workerRunningThisTask = allRunningTasks.get(task.getId());
- if (workerRunningThisTask != null) {
- // If the status is complete, just run the callback, otherwise monitor for the completion of the task
- if (!verifyStatusComplete(jsonMapper, workerRunningThisTask, task, callback)) {
- monitorStatus(jsonMapper, workerRunningThisTask, task, taskContext, callback, retryPolicy);
- }
- return;
- }
-
- // Run the task if it does not currently exist
- Worker theWorker = getLeastCapacityWorker();
- monitorStatus(jsonMapper, theWorker, task, taskContext, callback, retryPolicy);
- announceTask(theWorker, task, taskContext);
- }
- catch (Exception e) {
- log.error(e, "Failed to dispatch task. Retrying");
- retryTask(task, taskContext, callback, retryPolicy);
- }
- }
-
- private void retryTask(
- final Task task,
- final TaskContext taskContext,
- final TaskCallback callback,
- final RetryPolicy retryPolicy
- )
- {
- if (retryPolicy.hasExceededRetryThreshold()) {
- log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
- .emit();
- callback.notify(TaskStatus.failure(task.getId()));
-
- return;
- }
-
- scheduledExec.schedule(
- new Callable()
- {
- @Override
- public Object call() throws Exception
- {
- retryPolicy.runRunnables();
-
- log.info("Retry[%d] for task[%s]", retryPolicy.getNumRetries(), task.getId());
- run(task, taskContext, callback, retryPolicy);
- return null;
- }
- },
- retryPolicy.getAndIncrementRetryDelay(),
- TimeUnit.MILLISECONDS
+ assignTask(
+ new TaskWrapper(
+ task, context, callback, retryPolicyFactory.makeRetryPolicy()
+ )
);
}
- private Worker getLeastCapacityWorker()
- {
- final MinMaxPriorityQueue workerQueue = MinMaxPriorityQueue.orderedBy(
- new Comparator()
- {
- @Override
- public int compare(Worker w1, Worker w2)
- {
- return Ints.compare(w1.getTasks().size(), w2.getTasks().size());
- }
- }
- ).create(taskInventoryManager.getInventory());
-
- if (workerQueue.isEmpty()) {
- log.error("No worker nodes found!");
- throw new RuntimeException();
- }
-
- return workerQueue.peek();
- }
-
- private boolean verifyStatusComplete(
- final ObjectMapper jsonMapper,
- final Worker worker,
- final Task task,
- final TaskCallback callback
- )
+ private boolean assignTask(TaskWrapper taskWrapper)
{
+ // If the task already exists, we don't need to announce it
try {
- final String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), task.getId());
- final String statusPath = JOINER.join(config.getStatusPath(), worker.getHost(), task.getId());
+ WorkerWrapper workerWrapper;
+ if ((workerWrapper = findWorkerRunningTask(taskWrapper)) != null) {
+ final Worker worker = workerWrapper.getWorker();
+ TaskStatus taskStatus = jsonMapper.readValue(
+ cf.getData().forPath(JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())),
+ TaskStatus.class
+ );
- TaskStatus taskStatus = jsonMapper.readValue(
- cf.getData().forPath(statusPath), TaskStatus.class
- );
-
- if (taskStatus.isComplete()) {
- if (callback != null) {
- callback.notify(taskStatus);
+ if (taskStatus.isComplete()) {
+ TaskCallback callback = taskWrapper.getCallback();
+ if (callback != null) {
+ callback.notify(taskStatus);
+ }
+ new CleanupPaths(worker.getHost(), taskWrapper.getTask().getId()).run();
+ } else {
+ tasks.put(taskWrapper.getTask().getId(), taskWrapper);
}
-
- cf.delete().guaranteed().forPath(statusPath);
- cf.delete().guaranteed().forPath(taskPath);
-
return true;
}
}
@@ -212,120 +228,291 @@ public class RemoteTaskRunner implements TaskRunner
throw Throwables.propagate(e);
}
+ // Announce the task
+ WorkerWrapper workerWrapper = getWorkerForTask();
+ if (workerWrapper != null) {
+ announceTask(workerWrapper.getWorker(), taskWrapper);
+ return true;
+ }
+
return false;
}
/**
- * Creates a monitor for status updates and deletes. Worker nodes announce a status when they start a task and update
- * it again upon completing the task. If a status is deleted, this means the worker node has died before completing
- * its status update.
+ * Retries a task that has failed.
+ *
+ * @param pre - A runnable that is executed before the retry occurs
+ * @param taskWrapper - a container for task properties
*/
- private void monitorStatus(
- final ObjectMapper jsonMapper,
- final Worker worker,
- final Task task,
- final TaskContext taskContext,
- final TaskCallback callback,
- final RetryPolicy retryPolicy
- ) throws Exception
+ private void retryTask(
+ final Runnable pre,
+ final TaskWrapper taskWrapper
+ )
{
- final String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), task.getId());
- final String statusPath = JOINER.join(config.getStatusPath(), worker.getHost(), task.getId());
+ final Task task = taskWrapper.getTask();
+ final RetryPolicy retryPolicy = taskWrapper.getRetryPolicy();
- PathChildrenCache monitor = monitors.get(worker.getHost());
- if (monitor == null) {
- monitor = new PathChildrenCache(
- cf,
- JOINER.join(config.getStatusPath(), worker.getHost()),
- false
- );
- monitor.start();
+ log.info("Registering retry for failed task[%s]", task.getId());
+
+ if (retryPolicy.hasExceededRetryThreshold()) {
+ log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
+ .emit();
+ return;
}
- final PathChildrenCache statusMonitor = monitor;
- statusMonitor.getListenable().addListener(
- new PathChildrenCacheListener()
+ scheduledExec.schedule(
+ new Runnable()
{
@Override
- public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
- throws Exception
+ public void run()
{
try {
- if (pathChildrenCacheEvent.getData().getPath().equals(statusPath)) {
- if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
- throw new ISE("Worker[%s] dropped Task[%s]!", worker.getHost(), task.getId());
- }
+ if (pre != null) {
+ pre.run();
+ }
- TaskStatus taskStatus = jsonMapper.readValue(
- cf.getData().forPath(statusPath), TaskStatus.class
- );
-
- if (taskStatus.isComplete()) {
- if (callback != null) {
- callback.notify(taskStatus);
- }
-
- cf.delete().guaranteed().forPath(statusPath);
- cf.delete().guaranteed().forPath(taskPath);
- statusMonitor.close();
+ if (tasks.containsKey(task.getId())) {
+ log.info("Retry[%d] for task[%s]", retryPolicy.getNumRetries(), task.getId());
+ if (!assignTask(taskWrapper)) {
+ throw new ISE("Unable to find worker to send retry request to for task[%s]", task.getId());
}
}
}
catch (Exception e) {
- log.error(e, "Exception while cleaning up task[%s]. Retrying", task.getId());
-
- retryPolicy.registerRunnable(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- if (cf.checkExists().forPath(statusPath) != null) {
- cf.delete().guaranteed().forPath(statusPath);
- }
- if (cf.checkExists().forPath(taskPath) != null) {
- cf.delete().guaranteed().forPath(taskPath);
- }
- statusMonitor.close();
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
- }
- );
-
- retryTask(task, taskContext, callback, retryPolicy);
+ retryTask(null, taskWrapper);
}
}
- }
+ },
+ retryPolicy.getAndIncrementRetryDelay(),
+ TimeUnit.MILLISECONDS
);
}
- private void announceTask(Worker theWorker, Task task, TaskContext taskContext)
+ /**
+ * When a new worker appears, listeners are registered for status changes.
+ * Status changes indicate the creation or completion of task.
+ * The RemoteTaskRunner updates state according to these changes.
+ *
+ * @param worker - contains metadata for a worker that has appeared in ZK
+ */
+ private void addWorker(final Worker worker)
{
try {
- log.info(
- "Coordinator asking Worker[%s] to add"
- + " task[%s]", theWorker.getHost(), task.getId()
+ final String workerStatus = JOINER.join(config.getStatusPath(), worker.getHost());
+ final ConcurrentSkipListSet runningTasks = new ConcurrentSkipListSet(
+ cf.getChildren().forPath(workerStatus)
+ );
+ final PathChildrenCache watcher = new PathChildrenCache(cf, workerStatus, false);
+ final WorkerWrapper workerWrapper = new WorkerWrapper(
+ worker,
+ runningTasks,
+ watcher
);
- cf.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL)
- .forPath(
- JOINER.join(
- config.getTaskPath(),
- theWorker.getHost(),
- task.getId()
- ),
- jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
- );
+ // Add status listener to the watcher for status changes
+ watcher.getListenable().addListener(
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ synchronized (statusLock) {
+ String taskId = null;
+ try {
+ if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ String statusPath = event.getData().getPath();
+ TaskStatus taskStatus = jsonMapper.readValue(
+ cf.getData().forPath(statusPath), TaskStatus.class
+ );
+ taskId = taskStatus.getId();
+
+ log.info("New status[%s] appeared!", taskId);
+ runningTasks.add(taskId);
+ statusLock.notify();
+ } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
+ String statusPath = event.getData().getPath();
+ TaskStatus taskStatus = jsonMapper.readValue(
+ cf.getData().forPath(statusPath), TaskStatus.class
+ );
+ taskId = taskStatus.getId();
+
+ log.info("Task[%s] updated status[%s]!", taskId, taskStatus.getStatusCode());
+
+ if (taskStatus.isComplete()) {
+ workerWrapper.setLastCompletedTaskTime(new DateTime());
+ TaskWrapper taskWrapper = tasks.get(taskId);
+
+ if (taskWrapper == null) {
+ log.warn("A task completed that I didn't know about? WTF?!");
+ } else {
+ TaskCallback callback = taskWrapper.getCallback();
+
+ // Cleanup
+ if (callback != null) {
+ callback.notify(taskStatus);
+ }
+ tasks.remove(taskId);
+ runningTasks.remove(taskId);
+ cf.delete().guaranteed().forPath(statusPath);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
+ }
+ }
+ }
+ }
+ );
+ zkWorkers.put(worker.getHost(), workerWrapper);
+ watcher.start();
}
catch (Exception e) {
- log.error(e, "Exception creating task[%s] for worker node[%s]", task.getId(), theWorker.getHost());
throw Throwables.propagate(e);
}
}
+
+ private WorkerWrapper findWorkerRunningTask(TaskWrapper taskWrapper)
+ {
+ for (WorkerWrapper workerWrapper : zkWorkers.values()) {
+ if (workerWrapper.getRunningTasks().contains(taskWrapper.getTask().getId())) {
+ return workerWrapper;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
+ * to the worker. If tasks remain, they are retried.
+ *
+ * @param workerId - id of the removed worker
+ */
+ private void removeWorker(final String workerId)
+ {
+ WorkerWrapper workerWrapper = zkWorkers.get(workerId);
+ if (workerWrapper != null) {
+ for (String taskId : workerWrapper.getRunningTasks()) {
+ TaskWrapper taskWrapper = tasks.get(taskId);
+ if (taskWrapper != null) {
+ retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
+ }
+ workerWrapper.removeTask(taskId);
+ }
+
+ try {
+ workerWrapper.getWatcher().close();
+ }
+ catch (IOException e) {
+ log.error("Failed to close watcher associated with worker[%s]", workerWrapper.getWorker().getHost());
+ }
+ }
+ zkWorkers.remove(workerId);
+ }
+
+ private WorkerWrapper getWorkerForTask()
+ {
+ try {
+ final MinMaxPriorityQueue workerQueue = MinMaxPriorityQueue.orderedBy(
+ new Comparator()
+ {
+ @Override
+ public int compare(WorkerWrapper w1, WorkerWrapper w2)
+ {
+ return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size());
+ }
+ }
+ ).create(
+ FunctionalIterable.create(zkWorkers.values()).filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(WorkerWrapper input)
+ {
+ return (!input.isAtCapacity() &&
+ input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0);
+ }
+ }
+ )
+ );
+
+ if (workerQueue.isEmpty()) {
+ log.makeAlert("There are no worker nodes with capacity to run task!").emit();
+ strategy.provisionIfNeeded(zkWorkers);
+ return null;
+ }
+
+ return workerQueue.peek();
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
+ * removing the task ZK entry and creating a task status ZK entry.
+ *
+ * @param theWorker The worker the task is assigned to
+ * @param taskWrapper The task to be assigned
+ */
+ private void announceTask(Worker theWorker, TaskWrapper taskWrapper)
+ {
+ synchronized (statusLock) {
+ final Task task = taskWrapper.getTask();
+ final TaskContext taskContext = taskWrapper.getTaskContext();
+ try {
+ log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
+
+ tasks.put(task.getId(), taskWrapper);
+
+ cf.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL)
+ .forPath(
+ JOINER.join(
+ config.getTaskPath(),
+ theWorker.getHost(),
+ task.getId()
+ ),
+ jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
+ );
+
+ while (findWorkerRunningTask(taskWrapper) == null) {
+ statusLock.wait();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Exception creating task[%s] for worker node[%s]", task.getId(), theWorker.getHost());
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ private class CleanupPaths implements Runnable
+ {
+ private final String workerId;
+ private final String taskId;
+
+ private CleanupPaths(String workerId, String taskId)
+ {
+ this.workerId = workerId;
+ this.taskId = taskId;
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
+ final String taskPath = JOINER.join(config.getTaskPath(), workerId, taskId);
+ cf.delete().guaranteed().forPath(statusPath);
+ cf.delete().guaranteed().forPath(taskPath);
+ }
+ catch (Exception e) {
+ log.warn("Tried to delete a path that didn't exist! Must've gone away already!");
+ }
+ }
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java
index 1754e426977..b449ce01960 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RetryPolicy.java
@@ -48,23 +48,6 @@ public class RetryPolicy
this.retryCount = 0;
}
- /**
- * Register runnables that can be run at any point in a given retry.
- * @param runnable
- */
- public void registerRunnable(Runnable runnable)
- {
- runnables.add(runnable);
- }
-
- public void runRunnables()
- {
- for (Runnable runnable : runnables) {
- runnable.run();
- }
- runnables.clear();
- }
-
public long getAndIncrementRetryDelay()
{
long retVal = currRetryDelay;
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskInventoryManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskInventoryManager.java
deleted file mode 100644
index c8690bf5c36..00000000000
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskInventoryManager.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package com.metamx.druid.merger.coordinator;
-
-import com.metamx.common.Pair;
-import com.metamx.common.logger.Logger;
-import com.metamx.druid.client.InventoryManagementStrategy;
-import com.metamx.druid.client.InventoryManager;
-import com.metamx.druid.client.InventoryManagerConfig;
-import com.metamx.druid.merger.common.TaskStatus;
-import com.metamx.druid.merger.common.config.IndexerZkConfig;
-import com.metamx.druid.merger.worker.Worker;
-import com.metamx.phonebook.PhoneBook;
-import com.metamx.phonebook.PhoneBookPeon;
-
-import java.util.Map;
-
-/**
- * A simple {@link InventoryManager} that monitors ZK for the creation and deletion of new Workers and the
- * tasks each worker is assigned.
- */
-public class TaskInventoryManager extends InventoryManager
-{
- public TaskInventoryManager(
- IndexerZkConfig config,
- PhoneBook yp
- )
- {
- super(
- new Logger(TaskInventoryManager.class.getName() + "." + config.getStatusPath()),
- new InventoryManagerConfig(
- config.getAnnouncementPath(),
- config.getStatusPath()
- ),
- yp,
- new WorkerInventoryManagementStrategy(
- new Logger(
- TaskInventoryManager.class.getName() + "." + config.getStatusPath()
- )
- )
- );
- }
-
- private static class WorkerInventoryManagementStrategy implements InventoryManagementStrategy
- {
- private final Logger log;
-
- public WorkerInventoryManagementStrategy(
- Logger log
- )
- {
- this.log = log;
- }
-
- @Override
- public Class getContainerClass()
- {
- return Worker.class;
- }
-
- @Override
- public Pair> makeSubListener(final Worker worker)
- {
- return new Pair>(
- worker.getHost(),
- new PhoneBookPeon()
- {
- @Override
- public Class getObjectClazz()
- {
- return TaskStatus.class;
- }
-
- @Override
- public void newEntry(String name, TaskStatus taskStatus)
- {
- worker.addTask(taskStatus);
- log.info("Worker[%s] has new task[%s] in ZK", worker.getHost(), taskStatus.getId());
- }
-
- @Override
- public void entryRemoved(String taskId)
- {
- worker.removeTask(taskId);
- log.info("Worker[%s] removed task[%s] in ZK", worker.getHost(), taskId);
- }
- }
- );
- }
-
- @Override
- public void objectRemoved(Worker baseObject)
- {
- }
-
- @Override
- public boolean doesSerde()
- {
- return false;
- }
-
- @Override
- public Worker deserialize(String name, Map properties)
- {
- throw new UnsupportedOperationException();
- }
- }
-}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java
new file mode 100644
index 00000000000..c757bb2dc33
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java
@@ -0,0 +1,60 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator;
+
+import com.metamx.druid.merger.common.task.Task;
+
+/**
+ */
+public class TaskWrapper
+{
+ private final Task task;
+ private final TaskContext taskContext;
+ private final TaskCallback callback;
+ private final RetryPolicy retryPolicy;
+
+ public TaskWrapper(Task task, TaskContext taskContext, TaskCallback callback, RetryPolicy retryPolicy)
+ {
+ this.task = task;
+ this.taskContext = taskContext;
+ this.callback = callback;
+ this.retryPolicy = retryPolicy;
+ }
+
+ public Task getTask()
+ {
+ return task;
+ }
+
+ public TaskContext getTaskContext()
+ {
+ return taskContext;
+ }
+
+ public TaskCallback getCallback()
+ {
+ return callback;
+ }
+
+ public RetryPolicy getRetryPolicy()
+ {
+ return retryPolicy;
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java
new file mode 100644
index 00000000000..047533ae858
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/WorkerWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator;
+
+import com.metamx.druid.merger.worker.Worker;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
+import org.joda.time.DateTime;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ */
+public class WorkerWrapper
+{
+ private final Worker worker;
+ private final ConcurrentSkipListSet runningTasks;
+ private final PathChildrenCache watcher;
+
+ private volatile DateTime lastCompletedTaskTime;
+
+ public WorkerWrapper(Worker worker, ConcurrentSkipListSet runningTasks, PathChildrenCache watcher)
+ {
+ this.worker = worker;
+ this.runningTasks = runningTasks;
+ this.watcher = watcher;
+ }
+
+ public Worker getWorker()
+ {
+ return worker;
+ }
+
+ public Set getRunningTasks()
+ {
+ return runningTasks;
+ }
+
+ public PathChildrenCache getWatcher()
+ {
+ return watcher;
+ }
+
+ public DateTime getLastCompletedTaskTime()
+ {
+ return lastCompletedTaskTime;
+ }
+
+ public boolean isAtCapacity()
+ {
+ return runningTasks.size() >= worker.getCapacity();
+ }
+
+ public void setLastCompletedTaskTime(DateTime completedTaskTime)
+ {
+ lastCompletedTaskTime = completedTaskTime;
+ }
+
+ public void removeTask(String taskId)
+ {
+ runningTasks.remove(taskId);
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java
new file mode 100644
index 00000000000..3816468250f
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.config;
+
+import com.metamx.druid.merger.common.config.IndexerZkConfig;
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
+{
+ @Config("druid.indexer.terminateResources.periodMs")
+ @Default("3600000") // 1 hr
+ public abstract long getTerminateResourcesPeriodMs();
+
+ @Config("druid.indexer.terminateResources.windowMs")
+ @Default("300000") // 5 mins
+ public abstract long getTerminateResourcesWindowMs();
+
+ @Config("druid.indexer.minWorkerVersion")
+ public abstract String getMinWorkerVersion();
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java
index bb2e6bb40e0..044706b67ed 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RetryPolicyConfig.java
@@ -27,11 +27,11 @@ import org.skife.config.Default;
public abstract class RetryPolicyConfig
{
@Config("druid.indexer.retry.minWaitMillis")
- @Default("10000")
+ @Default("60000") // 1 minute
public abstract long getRetryMinMillis();
@Config("druid.indexer.retry.maxWaitMillis")
- @Default("60000")
+ @Default("600000") // 10 minutes
public abstract long getRetryMaxMillis();
@Config("druid.indexer.retry.maxRetryCount")
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/S3AutoScalingStrategyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/S3AutoScalingStrategyConfig.java
new file mode 100644
index 00000000000..1aa2145cdc3
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/S3AutoScalingStrategyConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.config;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class S3AutoScalingStrategyConfig
+{
+ @Config("druid.indexer.amiId")
+ public abstract String getAmiId();
+
+ @Config("druid.indexer.worker.port")
+ @Default("8080")
+ public abstract String getWorkerPort();
+
+ @Config("druid.indexer.instanceType")
+ public abstract String getInstanceType();
+
+ @Config("druid.indexer.millisToWaitBeforeTerminating")
+ @Default("1800000") // 30 mins
+ public abstract long getMillisToWaitBeforeTerminating();
+
+ // minimum number of workers that must always be running
+ @Config("druid.indexer.minNumWorkers")
+ @Default("1")
+ public abstract int getMinNuMWorkers();
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
index 6da4c491897..8b0d5eff747 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
@@ -19,6 +19,8 @@
package com.metamx.druid.merger.coordinator.http;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2Client;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -42,7 +44,6 @@ import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
-import com.metamx.druid.initialization.ZkClientConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
@@ -53,15 +54,21 @@ import com.metamx.druid.merger.coordinator.LocalTaskStorage;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.RetryPolicyFactory;
-import com.metamx.druid.merger.coordinator.TaskInventoryManager;
import com.metamx.druid.merger.coordinator.TaskMaster;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
import com.metamx.druid.merger.coordinator.TaskStorage;
+import com.metamx.druid.merger.coordinator.TaskWrapper;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
+import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
+import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
+import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
+import com.metamx.druid.merger.coordinator.scaling.S3AutoScalingStrategy;
+import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.realtime.S3SegmentPusher;
import com.metamx.druid.realtime.S3SegmentPusherConfig;
import com.metamx.druid.realtime.SegmentPusher;
@@ -78,9 +85,8 @@ import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
-import com.metamx.phonebook.PhoneBook;
import com.netflix.curator.framework.CuratorFramework;
-import org.I0Itec.zkclient.ZkClient;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.codehaus.jackson.map.InjectableValues;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.S3ServiceException;
@@ -96,6 +102,7 @@ import org.skife.config.ConfigurationObjectFactory;
import java.net.URL;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -126,7 +133,6 @@ public class IndexerCoordinatorNode
private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig;
- private TaskInventoryManager taskInventoryManager;
private TaskRunnerFactory taskRunnerFactory = null;
private TaskMaster taskMaster = null;
private Server server = null;
@@ -194,7 +200,6 @@ public class IndexerCoordinatorNode
initializeJacksonSubtypes();
initializeCurator();
initializeIndexerZkConfig();
- initializeTaskInventoryManager();
initializeTaskRunnerFactory();
initializeTaskMaster();
initializeServer();
@@ -265,7 +270,7 @@ public class IndexerCoordinatorNode
private void initializeTaskMaster()
{
- if(taskMaster == null) {
+ if (taskMaster == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
taskMaster = new TaskMaster(
taskQueue,
@@ -417,7 +422,7 @@ public class IndexerCoordinatorNode
if (curatorFramework == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
- serviceDiscoveryConfig.getZkHosts(),
+ serviceDiscoveryConfig,
lifecycle
);
}
@@ -430,28 +435,10 @@ public class IndexerCoordinatorNode
}
}
- public void initializeTaskInventoryManager()
- {
- if (taskInventoryManager == null) {
- final ZkClient zkClient = Initialization.makeZkClient(configFactory.build(ZkClientConfig.class), lifecycle);
- final PhoneBook masterYp = Initialization.createYellowPages(
- jsonMapper,
- zkClient,
- "Master-ZKYP--%s",
- lifecycle
- );
- taskInventoryManager = new TaskInventoryManager(
- indexerZkConfig,
- masterYp
- );
- lifecycle.addManagedInstance(taskInventoryManager);
- }
- }
-
public void initializeTaskStorage()
{
if (taskStorage == null) {
- if(config.getStorageImpl().equals("local")) {
+ if (config.getStorageImpl().equals("local")) {
taskStorage = new LocalTaskStorage();
} else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
@@ -481,13 +468,28 @@ public class IndexerCoordinatorNode
.build()
);
+ ScalingStrategy strategy = new S3AutoScalingStrategy(
+ new AmazonEC2Client(
+ new BasicAWSCredentials(
+ props.getProperty("com.metamx.aws.accessKey"),
+ props.getProperty("com.metamx.aws.secretKey")
+ )
+ ),
+ configFactory.build(S3AutoScalingStrategyConfig.class)
+ );
+ // TODO: remove this when AMI is ready
+ strategy = new NoopScalingStrategy(configFactory.build(S3AutoScalingStrategyConfig.class));
+
return new RemoteTaskRunner(
jsonMapper,
- taskInventoryManager,
- indexerZkConfig,
+ configFactory.build(RemoteTaskRunnerConfig.class),
curatorFramework,
+ new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), false),
retryScheduledExec,
- new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class))
+ new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
+ new ConcurrentHashMap(),
+ new ConcurrentHashMap(),
+ strategy
);
}
};
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java
new file mode 100644
index 00000000000..d878182b30c
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java
@@ -0,0 +1,124 @@
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.amazonaws.services.ec2.model.Instance;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Ordering;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
+import com.metamx.emitter.EmittingLogger;
+
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ * This class just logs when scaling should occur.
+ */
+public class NoopScalingStrategy implements ScalingStrategy
+{
+ private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class);
+
+ private final S3AutoScalingStrategyConfig config;
+
+ private final Object lock = new Object();
+
+ private volatile String currentlyProvisioning = null;
+ private volatile String currentlyTerminating = null;
+
+ public NoopScalingStrategy(
+ S3AutoScalingStrategyConfig config
+ )
+ {
+ this.config = config;
+ }
+
+ @Override
+ public void provisionIfNeeded(Map zkWorkers)
+ {
+ synchronized (lock) {
+ if (currentlyProvisioning != null) {
+ if (!zkWorkers.containsKey(currentlyProvisioning)) {
+ log.info(
+ "[%s] is still provisioning. Wait for it to finish before requesting new worker.",
+ currentlyProvisioning
+ );
+ return;
+ }
+ }
+
+ Iterable availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(WorkerWrapper input)
+ {
+ return !input.isAtCapacity();
+ }
+ }
+ );
+
+ if (Iterables.size(availableWorkers) == 0) {
+ try {
+ log.info("If I were a real strategy I'd create something now");
+ currentlyProvisioning = "willNeverBeTrue";
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to create instance");
+ currentlyProvisioning = null;
+ }
+ }
+ }
+ }
+
+ @Override
+ public Instance terminateIfNeeded(Map zkWorkers)
+ {
+ synchronized (lock) {
+ if (currentlyTerminating != null) {
+ if (zkWorkers.containsKey(currentlyTerminating)) {
+ log.info("[%s] has not terminated. Wait for it to finish before terminating again.", currentlyTerminating);
+ return null;
+ }
+ }
+
+ MinMaxPriorityQueue currWorkers = MinMaxPriorityQueue.orderedBy(
+ new Comparator()
+ {
+ @Override
+ public int compare(WorkerWrapper w1, WorkerWrapper w2)
+ {
+ return Ordering.natural()
+ .nullsFirst()
+ .compare(w1.getLastCompletedTaskTime(), w2.getLastCompletedTaskTime());
+ }
+ }
+ ).create(
+ zkWorkers.values()
+ );
+
+ if (currWorkers.size() <= config.getMinNuMWorkers()) {
+ return null;
+ }
+
+ WorkerWrapper thatLazyWorker = currWorkers.poll();
+
+ if (System.currentTimeMillis() - thatLazyWorker.getLastCompletedTaskTime().getMillis()
+ > config.getMillisToWaitBeforeTerminating()) {
+ try {
+ log.info("If I were a real strategy I'd terminate something now");
+ currentlyTerminating = "willNeverBeTrue";
+
+ return null;
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to terminate instance");
+ currentlyTerminating = null;
+ }
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java
new file mode 100644
index 00000000000..de00b7922a4
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java
@@ -0,0 +1,186 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesResult;
+import com.amazonaws.services.ec2.model.Filter;
+import com.amazonaws.services.ec2.model.Instance;
+import com.amazonaws.services.ec2.model.InstanceType;
+import com.amazonaws.services.ec2.model.RunInstancesRequest;
+import com.amazonaws.services.ec2.model.RunInstancesResult;
+import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.metamx.common.ISE;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
+import com.metamx.emitter.EmittingLogger;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ */
+public class S3AutoScalingStrategy implements ScalingStrategy
+{
+ private static final EmittingLogger log = new EmittingLogger(S3AutoScalingStrategy.class);
+
+ private final AmazonEC2Client amazonEC2Client;
+ private final S3AutoScalingStrategyConfig config;
+
+ private final Object lock = new Object();
+
+ private volatile String currentlyProvisioning = null;
+ private volatile String currentlyTerminating = null;
+
+ public S3AutoScalingStrategy(
+ AmazonEC2Client amazonEC2Client,
+ S3AutoScalingStrategyConfig config
+ )
+ {
+ this.amazonEC2Client = amazonEC2Client;
+ this.config = config;
+ }
+
+ @Override
+ public void provisionIfNeeded(Map zkWorkers)
+ {
+ synchronized (lock) {
+ if (zkWorkers.containsKey(currentlyProvisioning)) {
+ currentlyProvisioning = null;
+ }
+
+ if (currentlyProvisioning != null) {
+ log.info(
+ "[%s] is still provisioning. Wait for it to finish before requesting new worker.",
+ currentlyProvisioning
+ );
+ return;
+ }
+
+ Iterable availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(WorkerWrapper input)
+ {
+ return !input.isAtCapacity();
+ }
+ }
+ );
+
+ if (Iterables.size(availableWorkers) == 0) {
+ try {
+ log.info("Creating a new instance");
+ RunInstancesResult result = amazonEC2Client.runInstances(
+ new RunInstancesRequest(config.getAmiId(), 1, 1)
+ .withInstanceType(InstanceType.fromValue(config.getInstanceType()))
+ );
+
+ if (result.getReservation().getInstances().size() != 1) {
+ throw new ISE("Created more than one instance, WTF?!");
+ }
+
+ Instance instance = result.getReservation().getInstances().get(0);
+ log.info("Created instance: %s", instance.getInstanceId());
+ log.debug("%s", instance);
+
+ currentlyProvisioning = String.format("%s:%s", instance.getPrivateIpAddress(), config.getWorkerPort());
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to create instance");
+ currentlyProvisioning = null;
+ }
+ }
+ }
+ }
+
+ @Override
+ public Instance terminateIfNeeded(Map zkWorkers)
+ {
+ synchronized (lock) {
+ if (!zkWorkers.containsKey(currentlyTerminating)) {
+ currentlyProvisioning = null;
+ }
+
+ if (currentlyTerminating != null) {
+ log.info("[%s] has not terminated. Wait for it to finish before terminating again.", currentlyTerminating);
+ return null;
+ }
+
+ MinMaxPriorityQueue currWorkers = MinMaxPriorityQueue.orderedBy(
+ new Comparator()
+ {
+ @Override
+ public int compare(WorkerWrapper w1, WorkerWrapper w2)
+ {
+ return w1.getLastCompletedTaskTime().compareTo(w2.getLastCompletedTaskTime());
+ }
+ }
+ ).create(
+ zkWorkers.values()
+ );
+
+ if (currWorkers.size() <= config.getMinNuMWorkers()) {
+ return null;
+ }
+
+ WorkerWrapper thatLazyWorker = currWorkers.poll();
+
+ if (System.currentTimeMillis() - thatLazyWorker.getLastCompletedTaskTime().getMillis()
+ > config.getMillisToWaitBeforeTerminating()) {
+ DescribeInstancesResult result = amazonEC2Client.describeInstances(
+ new DescribeInstancesRequest()
+ .withFilters(
+ new Filter("private-ip-address", Arrays.asList(thatLazyWorker.getWorker().getIp()))
+ )
+ );
+
+ if (result.getReservations().size() != 1 || result.getReservations().get(0).getInstances().size() != 1) {
+ throw new ISE("More than one node with the same private IP[%s], WTF?!", thatLazyWorker.getWorker().getIp());
+ }
+
+ Instance instance = result.getReservations().get(0).getInstances().get(0);
+
+ try {
+ log.info("Terminating instance[%s]", instance.getInstanceId());
+ amazonEC2Client.terminateInstances(
+ new TerminateInstancesRequest(Arrays.asList(instance.getInstanceId()))
+ );
+
+ currentlyTerminating = String.format("%s:%s", instance.getPrivateIpAddress(), config.getWorkerPort());
+
+ return instance;
+ }
+ catch (Exception e) {
+ log.error(e, "Unable to terminate instance");
+ currentlyTerminating = null;
+ }
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java
new file mode 100644
index 00000000000..7aba31b0c25
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.amazonaws.services.ec2.model.Instance;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+
+import java.util.Map;
+
+/**
+ */
+public interface ScalingStrategy
+{
+ public void provisionIfNeeded(Map zkWorkers);
+
+ public Instance terminateIfNeeded(Map zkWorkers);
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java
index b70a6e9c278..938bda933fc 100644
--- a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java
+++ b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java
@@ -111,6 +111,7 @@ public class TaskMonitor
TaskStatus taskStatus;
try {
+ workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
taskStatus = task.run(taskContext, toolbox);
}
@@ -165,6 +166,7 @@ public class TaskMonitor
{
try {
pathChildrenCache.close();
+ exec.shutdown();
}
catch (Exception e) {
log.makeAlert(e, "Exception stopping TaskMonitor")
diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/Worker.java b/merger/src/main/java/com/metamx/druid/merger/worker/Worker.java
index 032febb79b9..a1ebf273521 100644
--- a/merger/src/main/java/com/metamx/druid/merger/worker/Worker.java
+++ b/merger/src/main/java/com/metamx/druid/merger/worker/Worker.java
@@ -19,42 +19,44 @@
package com.metamx.druid.merger.worker;
-import com.google.common.collect.ImmutableMap;
-import com.metamx.common.logger.Logger;
-import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
+ * A container for worker metadata.
*/
public class Worker
{
- private static final Logger log = new Logger(Worker.class);
-
private final String host;
-
- private final ConcurrentHashMap runningTasks;
+ private final String ip;
+ private final int capacity;
+ private final String version;
public Worker(
WorkerConfig config
)
{
this(
- config.getHost()
+ config.getHost(),
+ config.getIp(),
+ config.getCapacity(),
+ config.getVersion()
);
}
@JsonCreator
public Worker(
- @JsonProperty("host") String host
+ @JsonProperty("host") String host,
+ @JsonProperty("ip") String ip,
+ @JsonProperty("capacity") int capacity,
+ @JsonProperty("version") String version
)
{
this.host = host;
- this.runningTasks = new ConcurrentHashMap();
+ this.ip = ip;
+ this.capacity = capacity;
+ this.version = version;
}
@JsonProperty
@@ -63,25 +65,21 @@ public class Worker
return host;
}
- public Map getTasks()
+ @JsonProperty
+ public String getIp()
{
- return runningTasks;
+ return ip;
}
- public Map getStringProps()
+ @JsonProperty
+ public int getCapacity()
{
- return ImmutableMap.of(
- "host", host
- );
+ return capacity;
}
- public TaskStatus addTask(TaskStatus status)
+ @JsonProperty
+ public String getVersion()
{
- return runningTasks.put(status.getId(), status);
- }
-
- public TaskStatus removeTask(String taskId)
- {
- return runningTasks.remove(taskId);
+ return version;
}
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java
index 8510a890e1c..cdf056a88fa 100644
--- a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java
+++ b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java
@@ -92,7 +92,7 @@ public class WorkerCuratorCoordinator
makePathIfNotExisting(
getAnnouncementsPathForWorker(),
CreateMode.EPHEMERAL,
- worker.getStringProps()
+ worker
);
started = true;
@@ -171,6 +171,16 @@ public class WorkerCuratorCoordinator
}
}
+ public void unannounceTask(String taskId)
+ {
+ try {
+ curatorFramework.delete().guaranteed().forPath(getTaskPathForId(taskId));
+ }
+ catch (Exception e) {
+ log.warn("Could not delete task path for task[%s], looks like it already went away", taskId);
+ }
+ }
+
public void announceStatus(TaskStatus status)
{
synchronized (lock) {
diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java b/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java
index 6d258ae9dd8..e8e68749e2d 100644
--- a/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java
+++ b/merger/src/main/java/com/metamx/druid/merger/worker/config/WorkerConfig.java
@@ -32,4 +32,14 @@ public abstract class WorkerConfig
@Config("druid.host")
public abstract String getHost();
+
+ @Config("druid.worker.ip")
+ public abstract String getIp();
+
+ @Config("druid.worker.version")
+ public abstract String getVersion();
+
+ @Config("druid.worker.capacity")
+ @Default("3")
+ public abstract int getCapacity();
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java
index ee3888e12e4..67e1e2e089f 100644
--- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java
+++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java
@@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.http.StatusServlet;
+import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
@@ -294,8 +295,9 @@ public class WorkerNode
public void initializeCuratorFramework() throws IOException
{
+ final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
- props.getProperty("druid.zk.service.host"),
+ curatorConfig,
lifecycle
);
}
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java
new file mode 100644
index 00000000000..0aab85d9cf1
--- /dev/null
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java
@@ -0,0 +1,164 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesResult;
+import com.amazonaws.services.ec2.model.Instance;
+import com.amazonaws.services.ec2.model.Reservation;
+import com.amazonaws.services.ec2.model.RunInstancesRequest;
+import com.amazonaws.services.ec2.model.RunInstancesResult;
+import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
+import com.google.common.collect.Maps;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
+import com.metamx.druid.merger.worker.Worker;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ */
+public class S3AutoScalingStrategyTest
+{
+ private static final String AMI_ID = "dummy";
+ private static final String INSTANCE_ID = "theInstance";
+
+ private AmazonEC2Client amazonEC2Client;
+ private RunInstancesResult runInstancesResult;
+ private DescribeInstancesResult describeInstancesResult;
+ private Reservation reservation;
+ private Instance instance;
+ private WorkerWrapper worker;
+ private S3AutoScalingStrategy strategy;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ amazonEC2Client = EasyMock.createMock(AmazonEC2Client.class);
+ runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
+ describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
+ reservation = EasyMock.createMock(Reservation.class);
+
+ instance = new Instance().withInstanceId(INSTANCE_ID).withLaunchTime(new Date()).withImageId(AMI_ID);
+
+ worker = new WorkerWrapper(
+ new Worker("dummyHost", "dummyIP", 2, "0"),
+ new ConcurrentSkipListSet(),
+ null
+ );
+ worker.setLastCompletedTaskTime(new DateTime(0));
+ strategy = new S3AutoScalingStrategy(
+ amazonEC2Client, new S3AutoScalingStrategyConfig()
+ {
+ @Override
+ public String getAmiId()
+ {
+ return AMI_ID;
+ }
+
+ @Override
+ public String getWorkerPort()
+ {
+ return "8080";
+ }
+
+ @Override
+ public String getInstanceType()
+ {
+ return "t1.micro";
+ }
+
+ @Override
+ public long getMillisToWaitBeforeTerminating()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMinNuMWorkers()
+ {
+ return 0;
+ }
+ }
+ );
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ EasyMock.verify(amazonEC2Client);
+ EasyMock.verify(runInstancesResult);
+ EasyMock.verify(describeInstancesResult);
+ EasyMock.verify(reservation);
+ }
+
+ @Test
+ public void testScale()
+ {
+ EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
+ runInstancesResult
+ );
+ EasyMock.expect(amazonEC2Client.describeInstances(EasyMock.anyObject(DescribeInstancesRequest.class)))
+ .andReturn(describeInstancesResult);
+ EasyMock.expect(amazonEC2Client.terminateInstances(EasyMock.anyObject(TerminateInstancesRequest.class)))
+ .andReturn(null);
+ EasyMock.replay(amazonEC2Client);
+
+ EasyMock.expect(runInstancesResult.getReservation()).andReturn(reservation).atLeastOnce();
+ EasyMock.replay(runInstancesResult);
+
+ EasyMock.expect(describeInstancesResult.getReservations()).andReturn(Arrays.asList(reservation)).atLeastOnce();
+ EasyMock.replay(describeInstancesResult);
+
+ EasyMock.expect(reservation.getInstances()).andReturn(Arrays.asList(instance)).atLeastOnce();
+ EasyMock.replay(reservation);
+
+ Map zkWorkers = Maps.newHashMap();
+
+ zkWorkers.put(worker.getWorker().getHost(), worker);
+
+ worker.getRunningTasks().add("task1");
+
+ Assert.assertFalse(worker.isAtCapacity());
+
+ worker.getRunningTasks().add("task2");
+
+ Assert.assertTrue(worker.isAtCapacity());
+
+ strategy.provisionIfNeeded(zkWorkers);
+
+ worker.getRunningTasks().remove("task1");
+ worker.getRunningTasks().remove("task2");
+
+ Instance deleted = strategy.terminateIfNeeded(zkWorkers);
+
+ Assert.assertEquals(deleted.getInstanceId(), INSTANCE_ID);
+ }
+}
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/S3SegmentPusherConfig.java b/realtime/src/main/java/com/metamx/druid/realtime/S3SegmentPusherConfig.java
index 8bed74b8f82..96a96eeea10 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/S3SegmentPusherConfig.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/S3SegmentPusherConfig.java
@@ -26,7 +26,6 @@ import org.skife.config.Default;
*/
public abstract class S3SegmentPusherConfig
{
-
@Config("druid.pusher.s3.bucket")
public abstract String getBucket();
diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java
index 52de6ffad81..fe65e14b075 100644
--- a/server/src/main/java/com/metamx/druid/http/MasterMain.java
+++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java
@@ -155,13 +155,13 @@ public class MasterMain
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFrameworkClient(
- serviceDiscoveryConfig.getZkHosts(),
+ serviceDiscoveryConfig,
lifecycle
);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
- configFactory.build(ServiceDiscoveryConfig.class),
+ serviceDiscoveryConfig,
lifecycle
);