TaskQueue: Behavior tweaks, simplification

- Lock tasks on add if possible
- Detect "already added" using exception test instead of bookkeeping map
- Update task status after commitRunnable instead of before commitRunnable
This commit is contained in:
Gian Merlino 2013-02-01 12:16:06 -08:00
parent 455bae2c76
commit 393bec0539
1 changed files with 41 additions and 29 deletions

View File

@ -73,7 +73,6 @@ public class TaskQueue
{ {
private final List<Task> queue = Lists.newLinkedList(); private final List<Task> queue = Lists.newLinkedList();
private final Map<String, NavigableMap<Interval, TaskGroup>> running = Maps.newHashMap(); private final Map<String, NavigableMap<Interval, TaskGroup>> running = Maps.newHashMap();
private final Multimap<String, String> seenNextTasks = HashMultimap.create();
private final TaskStorage taskStorage; private final TaskStorage taskStorage;
@ -199,6 +198,11 @@ public class TaskQueue
queue.add(task); queue.add(task);
workMayBeAvailable.signalAll(); workMayBeAvailable.signalAll();
// Attempt to add this task to a running task group. Silently continue if this is not possible.
// The main reason this is here is so when subtasks are added, they end up in the same task group
// as their parent whenever possible.
tryLock(task);
return true; return true;
} }
finally { finally {
@ -274,26 +278,26 @@ public class TaskQueue
* Finally, if this task is not supposed to be running, this method will simply do nothing. * Finally, if this task is not supposed to be running, this method will simply do nothing.
* *
* @param task task to update * @param task task to update
* @param status new task status * @param originalStatus new task status
* @param commitRunnable operation to perform if this task is ready to commit * @param commitRunnable operation to perform if this task is ready to commit
* *
* @throws NullPointerException if task or status is null * @throws NullPointerException if task or status is null
* @throws IllegalArgumentException if the task ID does not match the status ID * @throws IllegalArgumentException if the task ID does not match the status ID
* @throws IllegalStateException if this queue is currently shut down * @throws IllegalStateException if this queue is currently shut down
*/ */
public void notify(final Task task, final TaskStatus status, final Runnable commitRunnable) public void notify(final Task task, final TaskStatus originalStatus, final Runnable commitRunnable)
{ {
giant.lock(); giant.lock();
try { try {
Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status"); Preconditions.checkNotNull(originalStatus, "status");
Preconditions.checkState(active, "Queue is not active!"); Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkArgument( Preconditions.checkArgument(
task.getId().equals(status.getId()), task.getId().equals(originalStatus.getId()),
"Mismatching task ids[%s/%s]", "Mismatching task ids[%s/%s]",
task.getId(), task.getId(),
status.getId() originalStatus.getId()
); );
final TaskGroup taskGroup; final TaskGroup taskGroup;
@ -306,20 +310,13 @@ public class TaskQueue
taskGroup = maybeTaskGroup.get(); taskGroup = maybeTaskGroup.get();
} }
// Update status in DB // This is what we want to write to the DB when we're done.
// TODO: We can either do this first, in which case we run the risk of having a task marked done in the DB but // Not final, since we might need to reassign the var later if the commitRunnable fails.
// TODO: not committed here; or we can do it last, in which case we run the risk of having a task marked running TaskStatus statusToSave = originalStatus;
// TODO: in the DB but committed here. Currently, we err on the former side because we don't want a ticking time
// TODO: bomb in the DB (a task marked running that we have forgotten about, which will potentially be re-
// TODO: started much later when a coordinator bootstraps).
// TODO:
// TODO: Eventually we should have this status update enter a retry queue instead of throwing an exception
// TODO: if it fails.
taskStorage.setStatus(task.getId(), status);
// Should we commit? // Should we commit?
if (taskGroup.getCommitStyle().shouldCommit(task, status)) { if (taskGroup.getCommitStyle().shouldCommit(task, statusToSave)) {
log.info("Committing %s status for task: %s", status.getStatusCode(), task.getId()); log.info("Committing %s status for task: %s", statusToSave.getStatusCode(), task.getId());
// Add next tasks // Add next tasks
try { try {
@ -330,12 +327,10 @@ public class TaskQueue
// We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over. // We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over.
// So, we need to remember which ones we've already spawned and not do them again. // So, we need to remember which ones we've already spawned and not do them again.
for (final Task nextTask : status.getNextTasks()) { for (final Task nextTask : statusToSave.getNextTasks()) {
if (!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) { try {
add(nextTask); add(nextTask);
tryLock(nextTask); } catch (TaskExistsException e) {
seenNextTasks.put(task.getId(), nextTask.getId());
} else {
log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId()); log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId());
} }
} }
@ -343,19 +338,36 @@ public class TaskQueue
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to commit task") log.makeAlert(e, "Failed to commit task")
.addData("task", task.getId()) .addData("task", task.getId())
.addData("statusCode", status.getStatusCode()) .addData("statusCode", statusToSave.getStatusCode())
.emit(); .emit();
// TODO -- If this fails, it should enter a retry queue instead of throwing an exception // Rewrite status
taskStorage.setStatus(task.getId(), TaskStatus.failure(task.getId()).withDuration(status.getDuration())); statusToSave = TaskStatus.failure(task.getId()).withDuration(statusToSave.getDuration());
} }
} else { } else {
log.info("Not committing %s status for task: %s", status.getStatusCode(), task); log.info("Not committing %s status for task: %s", statusToSave.getStatusCode(), task);
} }
if (status.isComplete()) { boolean didSetStatus = false;
try {
taskStorage.setStatus(task.getId(), statusToSave);
didSetStatus = true;
} catch(Exception e) {
// TODO: This could be a task-status-submission retry queue instead of retrying the entire task,
// TODO: which is heavy and probably not necessary.
log.warn(e, "Status could not be persisted! Reinserting task: %s", task.getId());
log.makeAlert(e, "Failed to persist task status")
.addData("task", task.getId())
.addData("statusCode", statusToSave.getStatusCode())
.emit();
queue.add(task);
}
if(didSetStatus && statusToSave.isComplete()) {
unlock(task); unlock(task);
seenNextTasks.removeAll(task.getId());
log.info("Task done: %s", task); log.info("Task done: %s", task);
} }
} }