RemoteTaskRunner changes to make bootstrapping actually work.

- Workers are not added to zkWorkers until caches have been initialized.
- Worker status we haven't heard about will be added to runningTasks or
  completeTasks as appropriate. 
- TaskRunnerWorkItem now only needs a taskId, not the entire Task. This makes
  it possible to create them from TaskStatus objects, if that's all we have.
- Also remove some dead code.
This commit is contained in:
Gian Merlino 2013-12-12 10:44:46 -08:00
parent d92b88718c
commit 0129ea99cf
12 changed files with 160 additions and 157 deletions

View File

@ -109,7 +109,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
tasks.put( tasks.put(
task.getId(), task.getId(),
new ForkingTaskRunnerWorkItem( new ForkingTaskRunnerWorkItem(
task, task.getId(),
exec.submit( exec.submit(
new Callable<TaskStatus>() new Callable<TaskStatus>()
{ {
@ -427,11 +427,11 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private volatile ProcessHolder processHolder = null; private volatile ProcessHolder processHolder = null;
private ForkingTaskRunnerWorkItem( private ForkingTaskRunnerWorkItem(
Task task, String taskId,
ListenableFuture<TaskStatus> statusFuture ListenableFuture<TaskStatus> statusFuture
) )
{ {
super(task, statusFuture); super(taskId, statusFuture);
} }
} }

View File

@ -52,7 +52,6 @@ import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.tasklogs.TaskLogStreamer; import io.druid.tasklogs.TaskLogStreamer;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@ -72,6 +71,8 @@ import java.util.Map;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -108,11 +109,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final HttpClient httpClient; private final HttpClient httpClient;
// all workers that exist in ZK // all workers that exist in ZK
private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>(); private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap<>();
// payloads of pending tasks, which we remember just long enough to assign to workers
private final ConcurrentMap<String, Task> pendingTaskPayloads = new ConcurrentHashMap<>();
// tasks that have not yet been assigned to a worker
private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
// all tasks that have been assigned to a worker // all tasks that have been assigned to a worker
private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue(); private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
// tasks that have not yet run
private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
// tasks that are complete but not cleaned up yet // tasks that are complete but not cleaned up yet
private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue(); private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue();
@ -150,6 +153,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return; return;
} }
final CountDownLatch initialized = new CountDownLatch(1);
// Add listener for creation/deletion of workers // Add listener for creation/deletion of workers
workerPathCache.getListenable().addListener( workerPathCache.getListenable().addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
@ -164,7 +169,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
event.getData().getData(), event.getData().getData(),
Worker.class Worker.class
); );
addWorker(worker, PathChildrenCache.StartMode.NORMAL); addWorker(worker);
break; break;
case CHILD_REMOVED: case CHILD_REMOVED:
worker = jsonMapper.readValue( worker = jsonMapper.readValue(
@ -173,22 +178,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
); );
removeWorker(worker); removeWorker(worker);
break; break;
case INITIALIZED:
initialized.countDown();
default: default:
break; break;
} }
} }
} }
); );
workerPathCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
for (ChildData childData : workerPathCache.getCurrentData()) {
final Worker worker = jsonMapper.readValue(
childData.getData(),
Worker.class
);
addWorker(worker, PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
}
workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
initialized.await();
started = true; started = true;
} }
catch (Exception e) { catch (Exception e) {
@ -249,11 +249,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return null; return null;
} }
public boolean isWorkerRunningTask(Worker worker, Task task) public boolean isWorkerRunningTask(Worker worker, String taskId)
{ {
ZkWorker zkWorker = zkWorkers.get(worker.getHost()); ZkWorker zkWorker = zkWorkers.get(worker.getHost());
return (zkWorker != null && zkWorker.isRunningTask(taskId));
return (zkWorker != null && zkWorker.isRunningTask(task.getId()));
} }
/** /**
@ -264,7 +263,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override @Override
public ListenableFuture<TaskStatus> run(final Task task) public ListenableFuture<TaskStatus> run(final Task task)
{ {
RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); final RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId());
if (runningTask != null) { if (runningTask != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId()); ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) { if (zkWorker == null) {
@ -285,14 +284,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
return pendingTask.getResult(); return pendingTask.getResult();
} }
return addPendingTask(task).getResult();
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task,
SettableFuture.<TaskStatus>create(),
null
);
addPendingTask(taskRunnerWorkItem);
return taskRunnerWorkItem.getResult();
} }
/** /**
@ -391,12 +383,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
/** /**
* Adds a task to the pending queue * Adds a task to the pending queue
*/ */
private void addPendingTask(final RemoteTaskRunnerWorkItem taskRunnerWorkItem) private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
{ {
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId()); log.info("Added pending task %s", task.getId());
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
pendingTasks.put(taskRunnerWorkItem.getTask().getId(), taskRunnerWorkItem); task.getId(),
SettableFuture.<TaskStatus>create(),
null
);
pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem);
runPendingTasks(); runPendingTasks();
return taskRunnerWorkItem;
} }
/** /**
@ -413,11 +411,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public Void call() throws Exception public Void call() throws Exception
{ {
try { try {
// make a copy of the pending tasks because assignTask may delete tasks from pending and move them // make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
// into running status // into running status
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values()); List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (RemoteTaskRunnerWorkItem taskWrapper : copy) { for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
assignTask(taskWrapper); String taskId = taskRunnerWorkItem.getTaskId();
if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
}
} }
} }
catch (Exception e) { catch (Exception e) {
@ -464,26 +465,34 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
* needs to bootstrap after a restart. * needs to bootstrap after a restart.
* *
* @param taskRunnerWorkItem - the task to assign * @param taskRunnerWorkItem - the task to assign
* @return true iff the task is now assigned
*/ */
private void assignTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem) private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{ {
try { try {
final String taskId = taskRunnerWorkItem.getTask().getId(); Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
if (runningTasks.containsKey(taskId) || findWorkerRunningTask(taskId) != null) { if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
log.info("Task[%s] already running.", taskId); log.info("Task[%s] already running.", task.getId());
return true;
} else { } else {
// Nothing running this task, announce it in ZK for a worker to run it // Nothing running this task, announce it in ZK for a worker to run it
ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); ZkWorker zkWorker = findWorkerForTask(task);
if (zkWorker != null) { if (zkWorker != null) {
announceTask(zkWorker, taskRunnerWorkItem); announceTask(task, zkWorker, taskRunnerWorkItem);
return true;
} else {
return false;
} }
} }
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Exception while trying to run task") log.makeAlert(e, "Exception while trying to run task")
.addData("taskId", taskRunnerWorkItem.getTask().getId()) .addData("taskId", taskRunnerWorkItem.getTaskId())
.emit(); .emit();
return false;
} }
} }
@ -494,9 +503,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
* @param theZkWorker The worker the task is assigned to * @param theZkWorker The worker the task is assigned to
* @param taskRunnerWorkItem The task to be assigned * @param taskRunnerWorkItem The task to be assigned
*/ */
private void announceTask(ZkWorker theZkWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception private void announceTask(
final Task task,
final ZkWorker theZkWorker,
final RemoteTaskRunnerWorkItem taskRunnerWorkItem
) throws Exception
{ {
final Task task = taskRunnerWorkItem.getTask(); Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
final Worker theWorker = theZkWorker.getWorker(); final Worker theWorker = theZkWorker.getWorker();
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId()); log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
@ -533,7 +546,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
Stopwatch timeoutStopwatch = new Stopwatch(); Stopwatch timeoutStopwatch = new Stopwatch();
timeoutStopwatch.start(); timeoutStopwatch.start();
synchronized (statusLock) { synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task)) { while (!isWorkerRunningTask(theWorker, task.getId())) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs); statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS); long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
@ -558,9 +571,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
* the worker. Status changes indicate the creation or completion of a task. * the worker. Status changes indicate the creation or completion of a task.
* The RemoteTaskRunner updates state according to these changes. * The RemoteTaskRunner updates state according to these changes.
* *
* @param worker - contains metadata for a worker that has appeared in ZK * @param worker contains metadata for a worker that has appeared in ZK
*/ */
private ZkWorker addWorker(final Worker worker, PathChildrenCache.StartMode startMode) private ZkWorker addWorker(final Worker worker)
{ {
log.info("Worker[%s] reportin' for duty!", worker.getHost()); log.info("Worker[%s] reportin' for duty!", worker.getHost());
@ -580,8 +593,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{ {
String taskId; final String taskId;
RemoteTaskRunnerWorkItem taskRunnerWorkItem; final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) { synchronized (statusLock) {
try { try {
switch (event.getType()) { switch (event.getType()) {
@ -600,15 +613,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
); );
// Synchronizing state with ZK // Synchronizing state with ZK
statusLock.notify(); statusLock.notifyAll();
taskRunnerWorkItem = runningTasks.get(taskId); final RemoteTaskRunnerWorkItem tmp;
if (taskRunnerWorkItem == null) { if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
log.warn( log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s", "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(), zkWorker.getWorker().getHost(),
taskId taskId
); );
taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
SettableFuture.<TaskStatus>create(),
zkWorker.getWorker()
);
runningTasks.put(taskId, taskRunnerWorkItem);
} }
if (taskStatus.isComplete()) { if (taskStatus.isComplete()) {
@ -621,11 +642,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
taskRunnerWorkItem = runningTasks.remove(taskId); taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId); log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else { } else {
log.info("Task[%s] went bye bye.", taskId); log.info("Task[%s] went bye bye.", taskId);
} }
break; break;
case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) != null) {
log.makeAlert("WTF?! Tried to add already-existing worker[%s]", worker.getHost())
.addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp())
.emit();
}
runPendingTasks();
} }
} }
catch (Exception e) { catch (Exception e) {
@ -638,12 +667,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
} }
); );
zkWorker.start();
zkWorker.start(startMode);
zkWorkers.put(worker.getHost(), zkWorker);
runPendingTasks();
return zkWorker; return zkWorker;
} }
catch (Exception e) { catch (Exception e) {
@ -690,7 +714,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
} }
log.info("Failing task[%s]", assignedTask); log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else { } else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask); log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
} }
@ -749,19 +773,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
TaskStatus taskStatus TaskStatus taskStatus
) )
{ {
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
Preconditions.checkNotNull(zkWorker, "zkWorker");
Preconditions.checkNotNull(taskStatus, "taskStatus");
// Worker is done with this task // Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime()); zkWorker.setLastCompletedTaskTime(new DateTime());
// Move from running -> complete // Move from running -> complete
if (taskRunnerWorkItem != null) { completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
completeTasks.put(taskStatus.getId(), taskRunnerWorkItem);
}
runningTasks.remove(taskStatus.getId()); runningTasks.remove(taskStatus.getId());
// Notify interested parties // Notify interested parties
if (taskRunnerWorkItem != null) { final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult(); if (result != null) {
if (result != null) { ((SettableFuture<TaskStatus>) result).set(taskStatus);
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
} }
} }
} }

View File

@ -21,7 +21,6 @@ package io.druid.indexing.overlord;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker; import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -33,25 +32,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
private final Worker worker; private final Worker worker;
public RemoteTaskRunnerWorkItem( public RemoteTaskRunnerWorkItem(
Task task, String taskId,
SettableFuture<TaskStatus> result, SettableFuture<TaskStatus> result,
Worker worker Worker worker
) )
{ {
super(task, result); super(taskId, result);
this.result = result; this.result = result;
this.worker = worker; this.worker = worker;
} }
public RemoteTaskRunnerWorkItem( public RemoteTaskRunnerWorkItem(
Task task, String taskId,
SettableFuture<TaskStatus> result, SettableFuture<TaskStatus> result,
DateTime createdTime, DateTime createdTime,
DateTime queueInsertionTime, DateTime queueInsertionTime,
Worker worker Worker worker
) )
{ {
super(task, result, createdTime, queueInsertionTime); super(taskId, result, createdTime, queueInsertionTime);
this.result = result; this.result = result;
this.worker = worker; this.worker = worker;
} }
@ -69,11 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
@Override @Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{ {
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker); return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), time, worker);
} }
public RemoteTaskRunnerWorkItem withWorker(Worker theWorker) public RemoteTaskRunnerWorkItem withWorker(Worker theWorker)
{ {
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), theWorker); return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), getQueueInsertionTime(), theWorker);
} }
} }

View File

@ -393,20 +393,6 @@ public class TaskLockbox
} }
} }
/**
* Removes all locks from this lockbox.
*/
public void clear()
{
giant.lock();
try {
running.clear();
} finally {
giant.unlock();
}
}
/** /**
* Return the currently-active lock posses for some task. * Return the currently-active lock posses for some task.
* *

View File

@ -117,6 +117,14 @@ public class TaskMaster
.emit(); .emit();
} }
leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskRunner);
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler( leaderLifecycle.addHandler(
new Lifecycle.Handler() new Lifecycle.Handler()
@ -134,14 +142,6 @@ public class TaskMaster
} }
} }
); );
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
try { try {
leaderLifecycle.start(); leaderLifecycle.start();
leading = true; leading = true;

View File

@ -221,7 +221,7 @@ public class TaskQueue
// Task futures available from the taskRunner // Task futures available from the taskRunner
final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = Maps.newHashMap(); final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = Maps.newHashMap();
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTask().getId(), workItem.getResult()); runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
} }
// Attain futures for all active tasks (assuming they are ready to run). // Attain futures for all active tasks (assuming they are ready to run).
for (final Task task : tasks) { for (final Task task : tasks) {

View File

@ -19,11 +19,9 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator; import org.joda.time.DateTimeComparator;
@ -32,36 +30,35 @@ import org.joda.time.DateTimeComparator;
*/ */
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem> public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{ {
private final Task task; private final String taskId;
private final ListenableFuture<TaskStatus> result; private final ListenableFuture<TaskStatus> result;
private final DateTime createdTime; private final DateTime createdTime;
private final DateTime queueInsertionTime; private final DateTime queueInsertionTime;
public TaskRunnerWorkItem( public TaskRunnerWorkItem(
Task task, String taskId,
ListenableFuture<TaskStatus> result ListenableFuture<TaskStatus> result
) )
{ {
this(task, result, new DateTime(), new DateTime()); this(taskId, result, new DateTime(), new DateTime());
} }
public TaskRunnerWorkItem( public TaskRunnerWorkItem(
Task task, String taskId,
ListenableFuture<TaskStatus> result, ListenableFuture<TaskStatus> result,
DateTime createdTime, DateTime createdTime,
DateTime queueInsertionTime DateTime queueInsertionTime
) )
{ {
this.task = task; this.taskId = taskId;
this.result = result; this.result = result;
this.createdTime = createdTime; this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime; this.queueInsertionTime = queueInsertionTime;
} }
@JsonProperty public String getTaskId()
public Task getTask()
{ {
return task; return taskId;
} }
public ListenableFuture<TaskStatus> getResult() public ListenableFuture<TaskStatus> getResult()
@ -69,13 +66,11 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return result; return result;
} }
@JsonProperty
public DateTime getCreatedTime() public DateTime getCreatedTime()
{ {
return createdTime; return createdTime;
} }
@JsonProperty
public DateTime getQueueInsertionTime() public DateTime getQueueInsertionTime()
{ {
return queueInsertionTime; return queueInsertionTime;
@ -83,7 +78,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem withQueueInsertionTime(DateTime time) public TaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{ {
return new TaskRunnerWorkItem(task, result, createdTime, time); return new TaskRunnerWorkItem(taskId, result, createdTime, time);
} }
@Override @Override
@ -91,7 +86,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{ {
return ComparisonChain.start() return ComparisonChain.start()
.compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance()) .compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance())
.compare(task.getId(), taskRunnerWorkItem.getTask().getId()) .compare(taskId, taskRunnerWorkItem.getTaskId())
.result(); .result();
} }
@ -99,9 +94,10 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public String toString() public String toString()
{ {
return "TaskRunnerWorkItem{" + return "TaskRunnerWorkItem{" +
"task=" + task + "taskId='" + taskId + '\'' +
", result=" + result + ", result=" + result +
", createdTime=" + createdTime + ", createdTime=" + createdTime +
", queueInsertionTime=" + queueInsertionTime +
'}'; '}';
} }
} }

View File

@ -19,7 +19,6 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -46,7 +45,6 @@ import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
@ -58,7 +56,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
private final TaskToolboxFactory toolboxFactory; private final TaskToolboxFactory toolboxFactory;
private final ListeningExecutorService exec; private final ListeningExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>(); private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
@ -82,8 +80,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
final TaskToolbox toolbox = toolboxFactory.build(task); final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox)); final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox));
final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture);
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem); runningItems.add(taskRunnerWorkItem);
Futures.addCallback( Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>() statusFuture, new FutureCallback<TaskStatus>()
@ -109,7 +106,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
public void shutdown(final String taskid) public void shutdown(final String taskid)
{ {
for (final TaskRunnerWorkItem runningItem : runningItems) { for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTask().getId().equals(taskid)) { if (runningItem.getTaskId().equals(taskid)) {
runningItem.getResult().cancel(true); runningItem.getResult().cancel(true);
} }
} }
@ -118,7 +115,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override @Override
public Collection<TaskRunnerWorkItem> getRunningTasks() public Collection<TaskRunnerWorkItem> getRunningTasks()
{ {
return ImmutableList.copyOf(runningItems); return ImmutableList.<TaskRunnerWorkItem>copyOf(runningItems);
} }
@Override @Override
@ -130,7 +127,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override @Override
public Collection<TaskRunnerWorkItem> getKnownTasks() public Collection<TaskRunnerWorkItem> getKnownTasks()
{ {
return ImmutableList.copyOf(runningItems); return ImmutableList.<TaskRunnerWorkItem>copyOf(runningItems);
} }
@Override @Override
@ -155,18 +152,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
QueryRunner<T> queryRunner = null; QueryRunner<T> queryRunner = null;
final List<Task> runningTasks = Lists.transform( for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
ImmutableList.copyOf(getRunningTasks()), new Function<TaskRunnerWorkItem, Task>() final Task task = taskRunnerWorkItem.getTask();
{
@Override
public Task apply(TaskRunnerWorkItem o)
{
return o.getTask();
}
}
);
for (final Task task : runningTasks) {
if (task.getDataSource().equals(query.getDataSource())) { if (task.getDataSource().equals(query.getDataSource())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query); final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
@ -185,6 +172,25 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner; return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
} }
private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;
private ThreadPoolTaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> result
)
{
super(task.getId(), result);
this.task = task;
}
public Task getTask()
{
return task;
}
}
private static class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus> private static class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>
{ {
private final Task task; private final Task task;
@ -242,10 +248,5 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
public TaskRunnerWorkItem getTaskRunnerWorkItem()
{
return new TaskRunnerWorkItem(task, null);
}
} }
} }

View File

@ -71,9 +71,9 @@ public class ZkWorker implements Closeable
}; };
} }
public void start(PathChildrenCache.StartMode startMode) throws Exception public void start() throws Exception
{ {
statusCache.start(startMode); statusCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
} }
public void addListener(PathChildrenCacheListener listener) public void addListener(PathChildrenCacheListener listener)

View File

@ -70,9 +70,7 @@ public class OverlordResource
public Map<String, Object> apply(TaskRunnerWorkItem input) public Map<String, Object> apply(TaskRunnerWorkItem input)
{ {
return new ImmutableMap.Builder<String, Object>() return new ImmutableMap.Builder<String, Object>()
.put("id", input.getTask().getId()) .put("id", input.getTaskId())
.put("dataSource", input.getTask().getDataSource())
.put("nodeType", input.getTask().getNodeType() == null ? "" : input.getTask().getNodeType())
.put("createdTime", input.getCreatedTime()) .put("createdTime", input.getCreatedTime())
.put("queueInsertionTime", input.getQueueInsertionTime()) .put("queueInsertionTime", input.getQueueInsertionTime())
.build(); .build();

View File

@ -219,7 +219,7 @@ public class RemoteTaskRunnerTest
) )
); );
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
} }
@Test @Test
@ -266,7 +266,7 @@ public class RemoteTaskRunnerTest
) )
); );
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
} }
@Test @Test
@ -280,7 +280,7 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(workerRunningTask(task.getId())); Assert.assertTrue(workerRunningTask(task.getId()));
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task")); Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTaskId().equals("task"));
cf.delete().forPath(joiner.join(statusPath, task.getId())); cf.delete().forPath(joiner.join(statusPath, task.getId()));
@ -320,7 +320,7 @@ public class RemoteTaskRunnerTest
@Override @Override
public String apply(RemoteTaskRunnerWorkItem input) public String apply(RemoteTaskRunnerWorkItem input)
{ {
return input.getTask().getId(); return input.getTaskId();
} }
} }
) )

View File

@ -111,7 +111,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -139,7 +139,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -155,7 +155,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -196,7 +196,7 @@ public class SimpleResourceManagementStrategyTest
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -214,7 +214,7 @@ public class SimpleResourceManagementStrategyTest
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -248,7 +248,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -278,7 +278,7 @@ public class SimpleResourceManagementStrategyTest
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -293,7 +293,7 @@ public class SimpleResourceManagementStrategyTest
terminatedSomething = simpleResourceManagementStrategy.doTerminate( terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)