From 8660db93fc01975b07abe18ca17fb166656222de Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 4 Nov 2013 10:56:26 -0800 Subject: [PATCH] RemoteTaskRunner: Run taskComplete after a task times out --- .../io/druid/indexing/overlord/RemoteTaskRunner.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 4ae6f89c004..0d9f6e6d60d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer // Nothing running this task, announce it in ZK for a worker to run it ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); if (zkWorker != null) { - announceTask(zkWorker.getWorker(), taskRunnerWorkItem); + announceTask(zkWorker, taskRunnerWorkItem); } } } @@ -508,12 +508,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for * removing the task ZK entry and creating a task status ZK entry. * - * @param theWorker The worker the task is assigned to + * @param theZkWorker The worker the task is assigned to * @param taskRunnerWorkItem The task to be assigned */ - private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception + private void announceTask(ZkWorker theZkWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception { final Task task = taskRunnerWorkItem.getTask(); + final Worker theWorker = theZkWorker.getWorker(); log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId()); @@ -562,7 +563,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer config.getTaskAssignmentTimeout() ); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); + taskComplete(taskRunnerWorkItem, theZkWorker, task.getId(), TaskStatus.failure(task.getId())); break; } } @@ -777,4 +778,4 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer zkWorker.setLastCompletedTaskTime(new DateTime()); cleanup(zkWorker.getWorker().getHost(), taskId); } -} \ No newline at end of file +}