mirror of https://github.com/apache/druid.git
RemoteTaskRunner: Run taskComplete after a task times out
This commit is contained in:
parent
186bbd1cb6
commit
8660db93fc
|
@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
// Nothing running this task, announce it in ZK for a worker to run it
|
||||
ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask());
|
||||
if (zkWorker != null) {
|
||||
announceTask(zkWorker.getWorker(), taskRunnerWorkItem);
|
||||
announceTask(zkWorker, taskRunnerWorkItem);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -508,12 +508,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
|
||||
* removing the task ZK entry and creating a task status ZK entry.
|
||||
*
|
||||
* @param theWorker The worker the task is assigned to
|
||||
* @param theZkWorker The worker the task is assigned to
|
||||
* @param taskRunnerWorkItem The task to be assigned
|
||||
*/
|
||||
private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
|
||||
private void announceTask(ZkWorker theZkWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
|
||||
{
|
||||
final Task task = taskRunnerWorkItem.getTask();
|
||||
final Worker theWorker = theZkWorker.getWorker();
|
||||
|
||||
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
|
||||
|
||||
|
@ -562,7 +563,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
config.getTaskAssignmentTimeout()
|
||||
);
|
||||
|
||||
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
|
||||
taskComplete(taskRunnerWorkItem, theZkWorker, task.getId(), TaskStatus.failure(task.getId()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue