bug fixes for RTR

This commit is contained in:
Fangjin Yang 2013-02-13 14:19:11 -08:00
parent b27a459f71
commit c7b4973700
20 changed files with 356 additions and 231 deletions

View File

@ -112,13 +112,13 @@ public class LocalTaskRunner implements TaskRunner
}
@Override
public Collection<TaskWrapper> getRunningTasks()
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return Lists.newArrayList();
}
@Override
public Collection<TaskWrapper> getPendingTasks()
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return Lists.newArrayList();
}

View File

@ -55,7 +55,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -71,7 +70,7 @@ import java.util.concurrent.TimeUnit;
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler} can take care of these duties.
* For example, {@link com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler} can take care of these duties.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
* that were associated with the node.
@ -92,9 +91,9 @@ public class RemoteTaskRunner implements TaskRunner
// all workers that exist in ZK
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
// all tasks that have been assigned to a worker
private final ConcurrentSkipListMap<String, TaskWrapper> runningTasks = new ConcurrentSkipListMap<String, TaskWrapper>();
private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue();
// tasks that have not yet run
private final ConcurrentSkipListMap<String, TaskWrapper> pendingTasks = new ConcurrentSkipListMap<String, TaskWrapper>();
private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue();
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
@ -190,20 +189,25 @@ public class RemoteTaskRunner implements TaskRunner
}
@Override
public Collection<TaskWrapper> getRunningTasks()
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return runningTasks.values();
}
@Override
public Collection<TaskWrapper> getPendingTasks()
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return pendingTasks.values();
}
public boolean isTaskRunning(String taskId)
public WorkerWrapper findWorkerRunningTask(String taskId)
{
return runningTasks.containsKey(taskId);
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
if (workerWrapper.getRunningTasks().contains(taskId)) {
return workerWrapper;
}
}
return null;
}
/**
@ -217,23 +221,25 @@ public class RemoteTaskRunner implements TaskRunner
public void run(Task task, TaskContext context, TaskCallback callback)
{
if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that is already running, WTF is happening?!", task.getId());
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
}
TaskWrapper taskWrapper = new TaskWrapper(
task, context, callback, retryPolicyFactory.makeRetryPolicy(), System.currentTimeMillis()
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task, context, callback, retryPolicyFactory.makeRetryPolicy(), new DateTime()
);
addPendingTask(taskWrapper);
addPendingTask(taskRunnerWorkItem);
}
private void addPendingTask(final TaskWrapper taskWrapper)
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
{
pendingTasks.put(taskWrapper.getTask().getId(), taskWrapper);
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId());
pendingTasks.put(taskRunnerWorkItem.getTask().getId(), taskRunnerWorkItem);
runPendingTasks();
}
/**
* This method uses a single threaded executor to extract all pending tasks and attempt to run them. Any tasks that
* are successfully started by a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
* This method should be run each time there is new worker capacity or if new tasks are assigned.
*/
private void runPendingTasks()
@ -244,9 +250,10 @@ public class RemoteTaskRunner implements TaskRunner
@Override
public Void call() throws Exception
{
// make a copy of the pending tasks because assignTask may delete tasks from pending and move them into moving
List<TaskWrapper> copy = Lists.newArrayList(pendingTasks.values());
for (TaskWrapper taskWrapper : copy) {
// make a copy of the pending tasks because assignTask may delete tasks from pending and move them
// into running status
List<TaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (TaskRunnerWorkItem taskWrapper : copy) {
assignTask(taskWrapper);
}
@ -259,76 +266,88 @@ public class RemoteTaskRunner implements TaskRunner
future.get();
}
catch (InterruptedException e) {
log.error(e, "InterruptedException in runPendingTasks()");
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
log.error(e, "ExecutionException in runPendingTasks()");
throw Throwables.propagate(e.getCause());
}
}
private void retryTask(final TaskWrapper taskWrapper, final String workerId, final String taskId)
/**
* Retries a task by inserting it back into the pending queue after a given delay.
* This method will also clean up any status paths that were associated with the task.
*
* @param taskRunnerWorkItem - the task to retry
* @param workerId - the worker that was previously running this task
*/
private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem, final String workerId)
{
final String taskId = taskRunnerWorkItem.getTask().getId();
log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId);
scheduledExec.schedule(
new Runnable()
{
@Override
public void run()
{
runningTasks.remove(taskId);
cleanup(workerId, taskId);
addPendingTask(taskWrapper);
addPendingTask(taskRunnerWorkItem);
}
},
taskWrapper.getRetryPolicy().getAndIncrementRetryDelay().getMillis(),
taskRunnerWorkItem.getRetryPolicy().getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
}
/**
* Removes a task from the running queue and clears out the ZK status path of the task.
*
* @param workerId - the worker that was previously running the task
* @param taskId - the task to cleanup
*/
private void cleanup(final String workerId, final String taskId)
{
try {
runningTasks.remove(taskId);
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
catch (Exception e) {
log.warn("Tried to delete a status path that didn't exist! Must've gone away already?");
}
try {
final String taskPath = JOINER.join(config.getTaskPath(), workerId, taskId);
cf.delete().guaranteed().forPath(taskPath);
}
catch (Exception e) {
log.warn("Tried to delete a task path that didn't exist! Must've gone away already?");
log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", statusPath);
}
}
/**
* Ensures no workers are already running a task before assigning the task to a worker.
* It is possible that a worker is running a task that the RTR has no knowledge of. This is common when the RTR
* It is possible that a worker is running a task that the RTR has no knowledge of. This is possible when the RTR
* needs to bootstrap after a restart.
*
* @param taskWrapper - a wrapper containing task metadata
* @param taskRunnerWorkItem - the task to assign
*/
private void assignTask(TaskWrapper taskWrapper)
private void assignTask(TaskRunnerWorkItem taskRunnerWorkItem)
{
try {
WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
WorkerWrapper workerWrapper = findWorkerRunningTask(taskRunnerWorkItem.getTask().getId());
// If a worker is already running this task, we don't need to announce it
if (workerWrapper != null) {
final Worker worker = workerWrapper.getWorker();
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskWrapper.getTask().getId());
runningTasks.put(taskWrapper.getTask().getId(), pendingTasks.remove(taskWrapper.getTask().getId()));
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskRunnerWorkItem.getTask().getId());
runningTasks.put(
taskRunnerWorkItem.getTask().getId(),
pendingTasks.remove(taskRunnerWorkItem.getTask().getId())
);
log.info("Task %s switched from pending to running", taskRunnerWorkItem.getTask().getId());
final ChildData statusData = workerWrapper.getStatusCache()
.getCurrentData(
JOINER.join(
config.getStatusPath(),
worker.getHost(),
taskWrapper.getTask().getId()
taskRunnerWorkItem.getTask().getId()
)
);
@ -337,19 +356,18 @@ public class RemoteTaskRunner implements TaskRunner
TaskStatus.class
);
TaskCallback callback = taskWrapper.getCallback();
if (taskStatus.isComplete()) {
TaskCallback callback = taskRunnerWorkItem.getCallback();
if (callback != null) {
callback.notify(taskStatus);
}
if (taskStatus.isComplete()) {
cleanup(worker.getHost(), taskWrapper.getTask().getId());
cleanup(worker.getHost(), taskRunnerWorkItem.getTask().getId());
}
} else {
// Announce the task or retry if there is not enough capacity
// Nothing running this task, announce it in ZK for a worker to run it
workerWrapper = findWorkerForTask();
if (workerWrapper != null) {
announceTask(workerWrapper.getWorker(), taskWrapper);
announceTask(workerWrapper.getWorker(), taskRunnerWorkItem);
}
}
}
@ -363,13 +381,12 @@ public class RemoteTaskRunner implements TaskRunner
* removing the task ZK entry and creating a task status ZK entry.
*
* @param theWorker The worker the task is assigned to
* @param taskWrapper The task to be assigned
* @param taskRunnerWorkItem The task to be assigned
*/
private void announceTask(Worker theWorker, TaskWrapper taskWrapper) throws Exception
private void announceTask(Worker theWorker, TaskRunnerWorkItem taskRunnerWorkItem) throws Exception
{
final Task task = taskWrapper.getTask();
final TaskContext taskContext = taskWrapper.getTaskContext();
final Task task = taskRunnerWorkItem.getTask();
final TaskContext taskContext = taskRunnerWorkItem.getTaskContext();
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
@ -390,17 +407,21 @@ public class RemoteTaskRunner implements TaskRunner
jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext))
);
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
synchronized (statusLock) {
// Syncing state with Zookeeper - wait for task to go into running queue
while (pendingTasks.containsKey(task.getId())) {
while (findWorkerRunningTask(task.getId()) == null) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
}
}
}
/**
* When a new worker appears, listeners are registered for status changes.
* Status changes indicate the creation or completion of task.
* When a new worker appears, listeners are registered for status changes associated with tasks assigned to
* the worker. Status changes indicate the creation or completion of task.
* The RemoteTaskRunner updates state according to these changes.
*
* @param worker - contains metadata for a worker that has appeared in ZK
@ -448,7 +469,7 @@ public class RemoteTaskRunner implements TaskRunner
}
catch (Exception e) {
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
retryTask(runningTasks.get(taskId), worker.getHost(), taskId);
retryTask(runningTasks.get(taskId), worker.getHost());
throw Throwables.propagate(e);
}
@ -459,16 +480,13 @@ public class RemoteTaskRunner implements TaskRunner
taskId
);
if (pendingTasks.containsKey(taskId)) {
runningTasks.put(taskId, pendingTasks.remove(taskId));
}
// Synchronizing state with ZK
synchronized (statusLock) {
statusLock.notify();
}
final TaskWrapper taskWrapper = runningTasks.get(taskId);
if (taskWrapper == null) {
final TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem == null) {
log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s",
worker.getHost(),
@ -477,10 +495,8 @@ public class RemoteTaskRunner implements TaskRunner
}
if (taskStatus.isComplete()) {
if (taskWrapper != null) {
final TaskCallback callback = taskWrapper.getCallback();
// Cleanup
if (taskRunnerWorkItem != null) {
final TaskCallback callback = taskRunnerWorkItem.getCallback();
if (callback != null) {
callback.notify(taskStatus);
}
@ -488,10 +504,15 @@ public class RemoteTaskRunner implements TaskRunner
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
runningTasks.remove(taskId);
cleanup(worker.getHost(), taskId);
runPendingTasks();
}
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
if (runningTasks.containsKey(taskId)) {
log.info("Task %s just disappeared!", taskId);
retryTask(runningTasks.get(taskId), worker.getHost());
}
}
}
catch (Exception e) {
@ -508,20 +529,15 @@ public class RemoteTaskRunner implements TaskRunner
runPendingTasks();
}
catch (
Exception e
)
{
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
* to the worker. If tasks remain, they are retried.
* When a ephemeral worker node disappears from ZK, incomplete running tasks will be retried by
* the logic in the status listener. We still have to make sure there are no tasks still assigned
* to the worker.
*
* @param worker - the removed worker
*/
@ -530,24 +546,32 @@ public class RemoteTaskRunner implements TaskRunner
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
try {
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost())));
Set<String> tasksPending = Sets.newHashSet(
cf.getChildren()
.forPath(JOINER.join(config.getTaskPath(), worker.getHost()))
);
log.info("%s had %d tasks pending", worker.getHost(), tasksPending.size());
for (String taskId : tasksToRetry) {
TaskWrapper taskWrapper = runningTasks.get(taskId);
if (taskWrapper != null) {
retryTask(runningTasks.get(taskId), worker.getHost(), taskId);
for (String taskId : tasksPending) {
TaskRunnerWorkItem taskRunnerWorkItem = pendingTasks.get(taskId);
if (taskRunnerWorkItem != null) {
cf.delete().guaranteed().forPath(JOINER.join(config.getTaskPath(), worker.getHost(), taskId));
retryTask(taskRunnerWorkItem, worker.getHost());
} else {
log.info("RemoteTaskRunner has no knowledge of pending task %s", taskId);
}
}
workerWrapper.getStatusCache().close();
}
catch (Exception e) {
log.error(e, "Failed to cleanly remove worker[%s]");
}
throw Throwables.propagate(e);
}
finally {
zkWorkers.remove(worker.getHost());
}
}
}
private WorkerWrapper findWorkerForTask()
{
@ -588,14 +612,4 @@ public class RemoteTaskRunner implements TaskRunner
throw Throwables.propagate(e);
}
}
private WorkerWrapper findWorkerRunningTask(TaskWrapper taskWrapper)
{
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
if (workerWrapper.getRunningTasks().contains(taskWrapper.getTask().getId())) {
return workerWrapper;
}
}
return null;
}
}

View File

@ -44,6 +44,11 @@ public class RetryPolicy
this.retryCount = 0;
}
public Duration getRetryDelay()
{
return currRetryDelay;
}
public Duration getAndIncrementRetryDelay()
{
Duration retVal = new Duration(currRetryDelay);

View File

@ -27,6 +27,8 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.netflix.curator.framework.CuratorFramework;
@ -56,6 +58,7 @@ public class TaskMaster
final ServiceDiscoveryConfig serviceDiscoveryConfig,
final MergerDBCoordinator mergerDBCoordinator,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
final ServiceEmitter emitter
)
@ -72,11 +75,13 @@ public class TaskMaster
log.info("By the power of Grayskull, I have the power!");
final TaskRunner runner = runnerFactory.build();
final ResourceManagementScheduler scheduler = managementSchedulerFactory.build(runner);
final TaskConsumer consumer = new TaskConsumer(queue, runner, mergerDBCoordinator, emitter);
// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle();
leaderLifecycle.addManagedInstance(queue);
leaderLifecycle.addManagedInstance(scheduler);
leaderLifecycle.addManagedInstance(runner);
Initialization.makeServiceDiscoveryClient(curator, serviceDiscoveryConfig, leaderLifecycle);
leaderLifecycle.addManagedInstance(consumer);

View File

@ -40,9 +40,9 @@ public interface TaskRunner
*/
public void run(Task task, TaskContext context, TaskCallback callback);
public Collection<TaskWrapper> getRunningTasks();
public Collection<TaskRunnerWorkItem> getRunningTasks();
public Collection<TaskWrapper> getPendingTasks();
public Collection<TaskRunnerWorkItem> getPendingTasks();
public Collection<WorkerWrapper> getWorkers();
}

View File

@ -19,26 +19,30 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.primitives.Longs;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
/**
* A holder for a task and different components associated with the task
*/
public class TaskWrapper implements Comparable<TaskWrapper>
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
private final Task task;
private final TaskContext taskContext;
private final TaskCallback callback;
private final RetryPolicy retryPolicy;
private final long createdTime;
private final DateTime createdTime;
public TaskWrapper(
private volatile DateTime queueInsertionTime;
public TaskRunnerWorkItem(
Task task,
TaskContext taskContext,
TaskCallback callback,
RetryPolicy retryPolicy,
long createdTime
DateTime createdTime
)
{
this.task = task;
@ -68,15 +72,26 @@ public class TaskWrapper implements Comparable<TaskWrapper>
return retryPolicy;
}
public long getCreatedTime()
public DateTime getCreatedTime()
{
return createdTime;
}
@Override
public int compareTo(TaskWrapper taskWrapper)
public DateTime getQueueInsertionTime()
{
return Longs.compare(createdTime, taskWrapper.getCreatedTime());
return queueInsertionTime;
}
public TaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
this.queueInsertionTime = time;
return this;
}
@Override
public int compareTo(TaskRunnerWorkItem taskRunnerWorkItem)
{
return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime());
}
@Override
@ -89,7 +104,7 @@ public class TaskWrapper implements Comparable<TaskWrapper>
return false;
}
TaskWrapper that = (TaskWrapper) o;
TaskRunnerWorkItem that = (TaskRunnerWorkItem) o;
if (callback != null ? !callback.equals(that.callback) : that.callback != null) {
return false;
@ -120,7 +135,7 @@ public class TaskWrapper implements Comparable<TaskWrapper>
@Override
public String toString()
{
return "TaskWrapper{" +
return "TaskRunnerWorkItem{" +
"task=" + task +
", taskContext=" + taskContext +
", callback=" + callback +

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import org.joda.time.DateTime;
import java.util.concurrent.ConcurrentSkipListMap;
/**
*/
public class TaskRunnerWorkQueue extends ConcurrentSkipListMap<String, TaskRunnerWorkItem>
{
@Override
public TaskRunnerWorkItem put(String s, TaskRunnerWorkItem taskRunnerWorkItem)
{
return super.put(s, taskRunnerWorkItem.withQueueInsertionTime(new DateTime()));
}
}

View File

@ -48,9 +48,9 @@ import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
@ -74,11 +74,12 @@ import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
@ -149,6 +150,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
private IndexerZkConfig indexerZkConfig;
private WorkerSetupManager workerSetupManager = null;
private TaskRunnerFactory taskRunnerFactory = null;
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
private TaskMaster taskMaster = null;
private Server server = null;
@ -211,6 +213,12 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this;
}
public IndexerCoordinatorNode setResourceManagementSchedulerFactory(ResourceManagementSchedulerFactory resourceManagementSchedulerFactory)
{
this.resourceManagementSchedulerFactory = resourceManagementSchedulerFactory;
return this;
}
public void init() throws Exception
{
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
@ -230,6 +238,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeIndexerZkConfig();
initializeWorkerSetupManager();
initializeTaskRunnerFactory();
initializeResourceManagement();
initializeTaskMaster();
initializeServer();
@ -305,6 +314,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
serviceDiscoveryConfig,
mergerDBCoordinator,
taskRunnerFactory,
resourceManagementSchedulerFactory,
curatorFramework,
emitter
);
@ -543,8 +553,6 @@ public class IndexerCoordinatorNode extends RegisteringNode
workerSetupManager
);
initializeResourceManagement(remoteTaskRunner);
return remoteTaskRunner;
}
};
@ -565,7 +573,13 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
private void initializeResourceManagement(TaskRunner taskRunner)
private void initializeResourceManagement()
{
if (resourceManagementSchedulerFactory == null) {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(TaskRunner runner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,
@ -594,8 +608,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
}
ResourceManagmentScheduler resourceManagmentScheduler = new ResourceManagmentScheduler(
taskRunner,
return new ResourceManagementScheduler(
runner,
new SimpleResourceManagementStrategy(
strategy,
configFactory.build(SimpleResourceManagmentConfig.class),
@ -604,7 +618,9 @@ public class IndexerCoordinatorNode extends RegisteringNode
configFactory.build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec
);
lifecycle.addManagedInstance(resourceManagmentScheduler);
}
};
}
}
public static class Builder

View File

@ -72,7 +72,6 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
WorkerSetupData setupData = workerSetupManager.getWorkerSetupData();
EC2NodeData workerConfig = setupData.getNodeData();
log.info("Creating new instance(s)...");
RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest(
workerConfig.getAmiId(),

View File

@ -32,15 +32,15 @@ import org.joda.time.Period;
import java.util.concurrent.ScheduledExecutorService;
/**
* The ResourceManagmentScheduler schedules a check for when worker nodes should potentially be created or destroyed.
* The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed.
* It uses a {@link TaskRunner} to return all pending tasks in the system and the status of the worker nodes in
* the system.
* The ResourceManagmentScheduler does not contain the logic to decide whether provision or termination should actually
* The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually
* occur. That decision is made in the {@link ResourceManagementStrategy}.
*/
public class ResourceManagmentScheduler
public class ResourceManagementScheduler
{
private static final Logger log = new Logger(ResourceManagmentScheduler.class);
private static final Logger log = new Logger(ResourceManagementScheduler.class);
private final TaskRunner taskRunner;
private final ResourceManagementStrategy resourceManagementStrategy;
@ -50,7 +50,7 @@ public class ResourceManagmentScheduler
private final Object lock = new Object();
private volatile boolean started = false;
public ResourceManagmentScheduler(
public ResourceManagementScheduler(
TaskRunner taskRunner,
ResourceManagementStrategy resourceManagementStrategy,
ResourceManagementSchedulerConfig config,
@ -124,6 +124,7 @@ public class ResourceManagmentScheduler
return;
}
exec.shutdown();
started = false;
}
}

View File

@ -29,7 +29,7 @@ import org.skife.config.Default;
public abstract class ResourceManagementSchedulerConfig
{
@Config("druid.indexer.provisionResources.duration")
@Default("PT1H")
@Default("PT1M")
public abstract Duration getProvisionResourcesDuration();
@Config("druid.indexer.terminateResources.duration")

View File

@ -0,0 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.druid.merger.coordinator.TaskRunner;
/**
*/
public interface ResourceManagementSchedulerFactory
{
public ResourceManagementScheduler build(TaskRunner runner);
}

View File

@ -19,7 +19,7 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.druid.merger.coordinator.TaskWrapper;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import java.util.Collection;
@ -30,9 +30,9 @@ import java.util.Collection;
*/
public interface ResourceManagementStrategy
{
public boolean doProvision(Collection<TaskWrapper> runningTasks, Collection<WorkerWrapper> workerWrappers);
public boolean doProvision(Collection<TaskRunnerWorkItem> runningTasks, Collection<WorkerWrapper> workerWrappers);
public boolean doTerminate(Collection<TaskWrapper> runningTasks, Collection<WorkerWrapper> workerWrappers);
public boolean doTerminate(Collection<TaskRunnerWorkItem> runningTasks, Collection<WorkerWrapper> workerWrappers);
public ScalingStats getStats();
}

View File

@ -24,7 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.TaskWrapper;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger;
@ -65,7 +65,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doProvision(Collection<TaskWrapper> pendingTasks, Collection<WorkerWrapper> workerWrappers)
public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<WorkerWrapper> workerWrappers)
{
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
@ -89,7 +89,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
if (nothingProvisioning && hasTaskPendingBeyondThreshold(pendingTasks)) {
if (nothingProvisioning) {
if (hasTaskPendingBeyondThreshold(pendingTasks)) {
AutoScalingData provisioned = autoScalingStrategy.provision();
if (provisioned != null) {
@ -99,6 +100,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return true;
}
}
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
@ -118,7 +120,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
}
@Override
public boolean doTerminate(Collection<TaskWrapper> pendingTasks, Collection<WorkerWrapper> workerWrappers)
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<WorkerWrapper> workerWrappers)
{
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
@ -214,12 +216,15 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats;
}
private boolean hasTaskPendingBeyondThreshold(Collection<TaskWrapper> pendingTasks)
private boolean hasTaskPendingBeyondThreshold(Collection<TaskRunnerWorkItem> pendingTasks)
{
long now = System.currentTimeMillis();
for (TaskWrapper pendingTask : pendingTasks) {
if (new Duration(pendingTask.getCreatedTime(), now).isEqual(config.getMaxPendingTaskDuration()) ||
new Duration(pendingTask.getCreatedTime(), now).isLongerThan(config.getMaxPendingTaskDuration())) {
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
if (new Duration(pendingTask.getQueueInsertionTime().getMillis(), now).isEqual(config.getMaxPendingTaskDuration())
||
new Duration(
pendingTask.getQueueInsertionTime().getMillis(), now
).isLongerThan(config.getMaxPendingTaskDuration())) {
return true;
}
}

View File

@ -32,14 +32,14 @@ public abstract class SimpleResourceManagmentConfig
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration")
@Default("PT1H")
@Default("PT15M")
public abstract Duration getMaxScalingDuration();
@Config("druid.indexer.numEventsToTrack")
@Default("20")
@Default("50")
public abstract int getNumEventsToTrack();
@Config("druid.indexer.maxPendingTaskDuration")
@Default("20")
@Default("PT30S")
public abstract Duration getMaxPendingTaskDuration();
}

View File

@ -93,6 +93,8 @@ public class TaskMonitor
final TaskContext taskContext = taskHolder.getTaskContext();
if (workerCuratorCoordinator.statusExists(task.getId())) {
log.warn("Got task %s that I am already running...", task.getId());
workerCuratorCoordinator.unannounceTask(task.getId());
return;
}

View File

@ -39,6 +39,7 @@ public abstract class WorkerConfig
@Config("druid.worker.version")
public abstract String getVersion();
@Config("druid.worker.capacity")
public int getCapacity()
{
return Runtime.getRuntime().availableProcessors() - 1;

View File

@ -164,11 +164,6 @@ public class RemoteTaskRunnerTest
}
catch (ISE expected) {
}
finally {
cf.delete().guaranteed().forPath(
String.format("%s/worker1/task1", statusPath)
);
}
}
@Test
@ -231,7 +226,7 @@ public class RemoteTaskRunnerTest
// Really don't like this way of waiting for the task to appear
int count = 0;
while (!remoteTaskRunner.isTaskRunning(task1.getId())) {
while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) == null) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Task still not announced in ZK?");
@ -249,7 +244,7 @@ public class RemoteTaskRunnerTest
// Really don't like this way of waiting for the task to disappear
count = 0;
while (remoteTaskRunner.isTaskRunning(task1.getId())) {
while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) != null) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Task still not announced in ZK?");

View File

@ -26,7 +26,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskWrapper;
import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
@ -120,8 +120,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskWrapper>asList(
new TaskWrapper(testTask, null, null, null, System.currentTimeMillis())
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
@ -148,8 +148,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskWrapper>asList(
new TaskWrapper(testTask, null, null, null, System.currentTimeMillis())
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
@ -164,8 +164,8 @@ public class SimpleResourceManagementStrategyTest
);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskWrapper>asList(
new TaskWrapper(testTask, null, null, null, System.currentTimeMillis())
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(testTask)
@ -198,8 +198,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskWrapper>asList(
new TaskWrapper(testTask, null, null, null, System.currentTimeMillis())
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(null)
@ -230,8 +230,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskWrapper>asList(
new TaskWrapper(testTask, null, null, null, System.currentTimeMillis())
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(null)
@ -245,8 +245,8 @@ public class SimpleResourceManagementStrategyTest
);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskWrapper>asList(
new TaskWrapper(testTask, null, null, null, System.currentTimeMillis())
Arrays.<TaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<WorkerWrapper>asList(
new TestWorkerWrapper(null)

View File

@ -142,44 +142,47 @@ public class DruidMaster
public Map<String, Double> getLoadStatus()
{
Map<String, Integer> availableSegmentMap = Maps.newHashMap();
for (DataSegment segment : getAvailableDataSegments()) {
Integer count = availableSegmentMap.get(segment.getDataSource());
int newCount = (count == null) ? 0 : count.intValue();
availableSegmentMap.put(segment.getDataSource(), ++newCount);
// find available segments
Map<String, Set<DataSegment>> availableSegments = Maps.newHashMap();
for (DataSegment dataSegment : getAvailableDataSegments()) {
Set<DataSegment> segments = availableSegments.get(dataSegment.getDataSource());
if (segments == null) {
segments = Sets.newHashSet();
availableSegments.put(dataSegment.getDataSource(), segments);
}
segments.add(dataSegment);
}
Map<String, Set<DataSegment>> loadedDataSources = Maps.newHashMap();
for (DruidServer server : serverInventoryManager.getInventory()) {
for (DruidDataSource dataSource : server.getDataSources()) {
if (!loadedDataSources.containsKey(dataSource.getName())) {
TreeSet<DataSegment> setToAdd = Sets.newTreeSet(DataSegment.bucketMonthComparator());
setToAdd.addAll(dataSource.getSegments());
loadedDataSources.put(dataSource.getName(), setToAdd);
} else {
loadedDataSources.get(dataSource.getName()).addAll(dataSource.getSegments());
// find segments currently loaded
Map<String, Set<DataSegment>> segmentsInCluster = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryManager.getInventory()) {
for (DruidDataSource druidDataSource : druidServer.getDataSources()) {
Set<DataSegment> segments = segmentsInCluster.get(druidDataSource.getName());
if (segments == null) {
segments = Sets.newHashSet();
segmentsInCluster.put(druidDataSource.getName(), segments);
}
segments.addAll(druidDataSource.getSegments());
}
}
Map<String, Integer> loadedSegmentMap = Maps.newHashMap();
for (Map.Entry<String, Set<DataSegment>> entry : loadedDataSources.entrySet()) {
loadedSegmentMap.put(entry.getKey(), entry.getValue().size());
// compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, Set<DataSegment>> entry : availableSegments.entrySet()) {
String dataSource = entry.getKey();
Set<DataSegment> segmentsAvailable = entry.getValue();
Set<DataSegment> loadedSegments = segmentsInCluster.get(dataSource);
if (loadedSegments == null) {
loadedSegments = Sets.newHashSet();
}
Set<DataSegment> unloadedSegments = Sets.difference(segmentsAvailable, loadedSegments);
loadStatus.put(
dataSource,
100 * ((double) (segmentsAvailable.size() - unloadedSegments.size()) / (double) segmentsAvailable.size())
);
}
Map<String, Double> retVal = Maps.newHashMap();
for (Map.Entry<String, Integer> entry : availableSegmentMap.entrySet()) {
String key = entry.getKey();
if (!loadedSegmentMap.containsKey(key) || entry.getValue().doubleValue() == 0.0) {
retVal.put(key, 0.0);
} else {
retVal.put(key, 100 * loadedSegmentMap.get(key).doubleValue() / entry.getValue().doubleValue());
}
}
return retVal;
return loadStatus;
}
public int lookupSegmentLifetime(DataSegment segment)