mirror of https://github.com/apache/druid.git
fix TaskQueue-HRTR deadlock (#6212)
* fix TaskQueue-HRTR deadlock causing https://github.com/apache/incubator-druid/issues/6201 * address review comments
This commit is contained in:
parent
28e6ae3664
commit
c3aaf8122d
|
@ -376,6 +376,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
// on a worker - this avoids overflowing a worker with tasks
|
// on a worker - this avoids overflowing a worker with tasks
|
||||||
long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
|
long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
|
||||||
long waitStart = System.currentTimeMillis();
|
long waitStart = System.currentTimeMillis();
|
||||||
|
boolean isTaskAssignmentTimedOut = false;
|
||||||
synchronized (statusLock) {
|
synchronized (statusLock) {
|
||||||
while (tasks.containsKey(taskId)
|
while (tasks.containsKey(taskId)
|
||||||
&& tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) {
|
&& tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) {
|
||||||
|
@ -383,29 +384,38 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
if (remaining > 0) {
|
if (remaining > 0) {
|
||||||
statusLock.wait(remaining);
|
statusLock.wait(remaining);
|
||||||
} else {
|
} else {
|
||||||
log.makeAlert(
|
isTaskAssignmentTimedOut = true;
|
||||||
"Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!",
|
break;
|
||||||
workerHost,
|
|
||||||
taskId,
|
|
||||||
config.getTaskAssignmentTimeout()
|
|
||||||
).emit();
|
|
||||||
taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isTaskAssignmentTimedOut) {
|
||||||
|
log.makeAlert(
|
||||||
|
"Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!",
|
||||||
|
workerHost,
|
||||||
|
taskId,
|
||||||
|
config.getTaskAssignmentTimeout()
|
||||||
|
).emit();
|
||||||
|
taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which results in TaskQueue.notifyStatus() being called
|
||||||
|
// because that is attached by TaskQueue to task result future. So, this method must not be called with "statusLock"
|
||||||
|
// held. See https://github.com/apache/incubator-druid/issues/6201
|
||||||
private void taskComplete(
|
private void taskComplete(
|
||||||
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
|
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
|
||||||
WorkerHolder workerHolder,
|
WorkerHolder workerHolder,
|
||||||
TaskStatus taskStatus
|
TaskStatus taskStatus
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread must not hold statusLock.");
|
||||||
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
|
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
|
||||||
Preconditions.checkNotNull(taskStatus, "taskStatus");
|
Preconditions.checkNotNull(taskStatus, "taskStatus");
|
||||||
if (workerHolder != null) {
|
if (workerHolder != null) {
|
||||||
|
@ -1170,6 +1180,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
|
|
||||||
HttpRemoteTaskRunnerWorkItem taskItem;
|
HttpRemoteTaskRunnerWorkItem taskItem;
|
||||||
boolean shouldShutdownTask = false;
|
boolean shouldShutdownTask = false;
|
||||||
|
boolean isTaskCompleted = false;
|
||||||
|
|
||||||
synchronized (statusLock) {
|
synchronized (statusLock) {
|
||||||
taskItem = tasks.get(taskId);
|
taskItem = tasks.get(taskId);
|
||||||
|
@ -1293,7 +1304,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
|
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
|
||||||
}
|
}
|
||||||
|
|
||||||
taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
|
isTaskCompleted = true;
|
||||||
} else {
|
} else {
|
||||||
log.warn(
|
log.warn(
|
||||||
"Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.",
|
"Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.",
|
||||||
|
@ -1327,6 +1338,10 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isTaskCompleted) {
|
||||||
|
taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
|
||||||
|
}
|
||||||
|
|
||||||
if (shouldShutdownTask) {
|
if (shouldShutdownTask) {
|
||||||
log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
|
log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
|
||||||
workerHolder.shutdownTask(taskId);
|
workerHolder.shutdownTask(taskId);
|
||||||
|
|
Loading…
Reference in New Issue