diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 4ae6f89c004..0d9f6e6d60d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer // Nothing running this task, announce it in ZK for a worker to run it ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); if (zkWorker != null) { - announceTask(zkWorker.getWorker(), taskRunnerWorkItem); + announceTask(zkWorker, taskRunnerWorkItem); } } } @@ -508,12 +508,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for * removing the task ZK entry and creating a task status ZK entry. * - * @param theWorker The worker the task is assigned to + * @param theZkWorker The worker the task is assigned to * @param taskRunnerWorkItem The task to be assigned */ - private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception + private void announceTask(ZkWorker theZkWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception { final Task task = taskRunnerWorkItem.getTask(); + final Worker theWorker = theZkWorker.getWorker(); log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId()); @@ -562,7 +563,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer config.getTaskAssignmentTimeout() ); - taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); + taskComplete(taskRunnerWorkItem, theZkWorker, task.getId(), TaskStatus.failure(task.getId())); break; } } @@ -777,4 +778,4 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer zkWorker.setLastCompletedTaskTime(new DateTime()); cleanup(zkWorker.getWorker().getHost(), taskId); } -} \ No newline at end of file +} diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index 51ad98adaf2..3e31461cecf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -108,6 +108,8 @@ public class WorkerTaskMonitor return; } + log.info("Submitting runnable for task[%s]", task.getId()); + exec.submit( new Runnable() {