From 393bec0539e3e747cc7b9ecc1950cbce6fa15491 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 1 Feb 2013 12:16:06 -0800 Subject: [PATCH] TaskQueue: Behavior tweaks, simplification - Lock tasks on add if possible - Detect "already added" using exception test instead of bookkeeping map - Update task status after commitRunnable instead of before commitRunnable --- .../druid/merger/coordinator/TaskQueue.java | 70 +++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java index 6c7058475f1..e228b401025 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java @@ -73,7 +73,6 @@ public class TaskQueue { private final List queue = Lists.newLinkedList(); private final Map> running = Maps.newHashMap(); - private final Multimap seenNextTasks = HashMultimap.create(); private final TaskStorage taskStorage; @@ -199,6 +198,11 @@ public class TaskQueue queue.add(task); workMayBeAvailable.signalAll(); + // Attempt to add this task to a running task group. Silently continue if this is not possible. + // The main reason this is here is so when subtasks are added, they end up in the same task group + // as their parent whenever possible. + tryLock(task); + return true; } finally { @@ -274,26 +278,26 @@ public class TaskQueue * Finally, if this task is not supposed to be running, this method will simply do nothing. * * @param task task to update - * @param status new task status + * @param originalStatus new task status * @param commitRunnable operation to perform if this task is ready to commit * * @throws NullPointerException if task or status is null * @throws IllegalArgumentException if the task ID does not match the status ID * @throws IllegalStateException if this queue is currently shut down */ - public void notify(final Task task, final TaskStatus status, final Runnable commitRunnable) + public void notify(final Task task, final TaskStatus originalStatus, final Runnable commitRunnable) { giant.lock(); try { Preconditions.checkNotNull(task, "task"); - Preconditions.checkNotNull(status, "status"); + Preconditions.checkNotNull(originalStatus, "status"); Preconditions.checkState(active, "Queue is not active!"); Preconditions.checkArgument( - task.getId().equals(status.getId()), + task.getId().equals(originalStatus.getId()), "Mismatching task ids[%s/%s]", task.getId(), - status.getId() + originalStatus.getId() ); final TaskGroup taskGroup; @@ -306,20 +310,13 @@ public class TaskQueue taskGroup = maybeTaskGroup.get(); } - // Update status in DB - // TODO: We can either do this first, in which case we run the risk of having a task marked done in the DB but - // TODO: not committed here; or we can do it last, in which case we run the risk of having a task marked running - // TODO: in the DB but committed here. Currently, we err on the former side because we don't want a ticking time - // TODO: bomb in the DB (a task marked running that we have forgotten about, which will potentially be re- - // TODO: started much later when a coordinator bootstraps). - // TODO: - // TODO: Eventually we should have this status update enter a retry queue instead of throwing an exception - // TODO: if it fails. - taskStorage.setStatus(task.getId(), status); + // This is what we want to write to the DB when we're done. + // Not final, since we might need to reassign the var later if the commitRunnable fails. + TaskStatus statusToSave = originalStatus; // Should we commit? - if (taskGroup.getCommitStyle().shouldCommit(task, status)) { - log.info("Committing %s status for task: %s", status.getStatusCode(), task.getId()); + if (taskGroup.getCommitStyle().shouldCommit(task, statusToSave)) { + log.info("Committing %s status for task: %s", statusToSave.getStatusCode(), task.getId()); // Add next tasks try { @@ -330,12 +327,10 @@ public class TaskQueue // We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over. // So, we need to remember which ones we've already spawned and not do them again. - for (final Task nextTask : status.getNextTasks()) { - if (!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) { + for (final Task nextTask : statusToSave.getNextTasks()) { + try { add(nextTask); - tryLock(nextTask); - seenNextTasks.put(task.getId(), nextTask.getId()); - } else { + } catch (TaskExistsException e) { log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId()); } } @@ -343,19 +338,36 @@ public class TaskQueue catch (Exception e) { log.makeAlert(e, "Failed to commit task") .addData("task", task.getId()) - .addData("statusCode", status.getStatusCode()) + .addData("statusCode", statusToSave.getStatusCode()) .emit(); - // TODO -- If this fails, it should enter a retry queue instead of throwing an exception - taskStorage.setStatus(task.getId(), TaskStatus.failure(task.getId()).withDuration(status.getDuration())); + // Rewrite status + statusToSave = TaskStatus.failure(task.getId()).withDuration(statusToSave.getDuration()); } } else { - log.info("Not committing %s status for task: %s", status.getStatusCode(), task); + log.info("Not committing %s status for task: %s", statusToSave.getStatusCode(), task); } - if (status.isComplete()) { + boolean didSetStatus = false; + + try { + taskStorage.setStatus(task.getId(), statusToSave); + didSetStatus = true; + } catch(Exception e) { + // TODO: This could be a task-status-submission retry queue instead of retrying the entire task, + // TODO: which is heavy and probably not necessary. + log.warn(e, "Status could not be persisted! Reinserting task: %s", task.getId()); + + log.makeAlert(e, "Failed to persist task status") + .addData("task", task.getId()) + .addData("statusCode", statusToSave.getStatusCode()) + .emit(); + + queue.add(task); + } + + if(didSetStatus && statusToSave.isComplete()) { unlock(task); - seenNextTasks.removeAll(task.getId()); log.info("Task done: %s", task); } }