diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 36fc44a80c3..42f8d5c22e3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -376,6 +376,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // on a worker - this avoids overflowing a worker with tasks long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); long waitStart = System.currentTimeMillis(); + boolean isTaskAssignmentTimedOut = false; synchronized (statusLock) { while (tasks.containsKey(taskId) && tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) { @@ -383,29 +384,38 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer if (remaining > 0) { statusLock.wait(remaining); } else { - log.makeAlert( - "Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!", - workerHost, - taskId, - config.getTaskAssignmentTimeout() - ).emit(); - taskComplete(workItem, workerHolder, TaskStatus.failure(taskId)); - return true; + isTaskAssignmentTimedOut = true; + break; } } - return true; } + + if (isTaskAssignmentTimedOut) { + log.makeAlert( + "Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!", + workerHost, + taskId, + config.getTaskAssignmentTimeout() + ).emit(); + taskComplete(workItem, workerHolder, TaskStatus.failure(taskId)); + } + + return true; } else { return false; } } + // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which results in TaskQueue.notifyStatus() being called + // because that is attached by TaskQueue to task result future. So, this method must not be called with "statusLock" + // held. See https://github.com/apache/incubator-druid/issues/6201 private void taskComplete( HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem, WorkerHolder workerHolder, TaskStatus taskStatus ) { + Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread must not hold statusLock."); Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); Preconditions.checkNotNull(taskStatus, "taskStatus"); if (workerHolder != null) { @@ -1170,6 +1180,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer HttpRemoteTaskRunnerWorkItem taskItem; boolean shouldShutdownTask = false; + boolean isTaskCompleted = false; synchronized (statusLock) { taskItem = tasks.get(taskId); @@ -1293,7 +1304,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); } - taskComplete(taskItem, workerHolder, announcement.getTaskStatus()); + isTaskCompleted = true; } else { log.warn( "Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.", @@ -1327,6 +1338,10 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } } + if (isTaskCompleted) { + taskComplete(taskItem, workerHolder, announcement.getTaskStatus()); + } + if (shouldShutdownTask) { log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost()); workerHolder.shutdownTask(taskId);