mirror of https://github.com/apache/druid.git
Fix race in TaskQueue.notifyStatus. (#12901)
* Fix race in TaskQueue.notifyStatus. It was possible for manageInternal to relaunch a task while it was being cleaned up, due to a race that happens when notifyStatus is called to clean up a successful task: 1) In a critical section, notifyStatus removes the task from "tasks". 2) Outside a critical section, notifyStatus calls taskRunner.shutdown to let the task runner know it can clear out its data structures. 3) In a critical section, syncFromStorage adds the task back to "tasks", because it is still present in metadata storage. 4) In a critical section, manageInternalCritical notices that the task is in "tasks" and is not running in the taskRunner, so it launches it again. 5) In a (different) critical section, notifyStatus updates the metadata store to set the task status to SUCCESS. 6) The task continues running even though it should not be. The possibility for this race was introduced in #12099, which shrunk the critical section in notifyStatus. Prior to that patch, a single critical section encompassed (1), (2), and (5), so the ordering above was not possible. This patch does the following: 1) Fixes the race by adding a recentlyCompletedTasks set that prevents the main management loop from doing anything with tasks that are currently being cleaned up. 2) Switches the order of the critical sections in notifyStatus, so metadata store updates happen first. This is useful in case of server failures: it ensures that if the Overlord fails in the midst of notifyStatus, then completed-task statuses are still available in ZK or on MMs for the next Overlord. (Those are cleaned up by taskRunner.shutdown, which formerly ran first.) This isn't related to the race described above, but is fixed opportunistically as part of the same patch. 3) Changes the "tasks" list to a map. Many operations require retrieval or removal of individual tasks; those are now O(1) instead of O(N) in the number of running tasks. 4) Changes various log messages to use task ID instead of full task payload, to make the logs more readable. * Fix format string. * Update comment.
This commit is contained in:
parent
6c5a43106a
commit
28836dfa71
|
@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelI
|
|||
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
|
||||
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
|
||||
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
|
@ -56,6 +57,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -82,14 +84,21 @@ import java.util.stream.Collectors;
|
|||
*/
|
||||
public class TaskQueue
|
||||
{
|
||||
private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
|
||||
private final long MIN_WAIT_TIME_MS = 100;
|
||||
private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
|
||||
private static final long MIN_WAIT_TIME_MS = 100;
|
||||
|
||||
// Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up).
|
||||
@GuardedBy("giant")
|
||||
private final List<Task> tasks = new ArrayList<>();
|
||||
private final LinkedHashMap<String, Task> tasks = new LinkedHashMap<>();
|
||||
|
||||
// Task ID -> Future from the TaskRunner
|
||||
@GuardedBy("giant")
|
||||
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new HashMap<>();
|
||||
|
||||
// Tasks that are in the process of being cleaned up by notifyStatus. Prevents manageInternal from re-launching them.
|
||||
@GuardedBy("giant")
|
||||
private final Set<String> recentlyCompletedTasks = new HashSet<>();
|
||||
|
||||
private final TaskLockConfig lockConfig;
|
||||
private final TaskQueueConfig config;
|
||||
private final DefaultTaskConfig defaultTaskConfig;
|
||||
|
@ -349,11 +358,19 @@ public class TaskQueue
|
|||
{
|
||||
// Task futures available from the taskRunner
|
||||
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
|
||||
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
|
||||
if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
|
||||
// Don't do anything with tasks that have recently finished; notifyStatus will handle it.
|
||||
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
|
||||
}
|
||||
}
|
||||
// Attain futures for all active tasks (assuming they are ready to run).
|
||||
// Copy tasks list, as notifyStatus may modify it.
|
||||
for (final Task task : ImmutableList.copyOf(tasks)) {
|
||||
for (final Task task : ImmutableList.copyOf(tasks.values())) {
|
||||
if (recentlyCompletedTasks.contains(task.getId())) {
|
||||
// Don't do anything with tasks that have recently finished; notifyStatus will handle it.
|
||||
continue;
|
||||
}
|
||||
|
||||
knownTaskIds.add(task.getId());
|
||||
|
||||
if (!taskFutures.containsKey(task.getId())) {
|
||||
|
@ -478,20 +495,32 @@ public class TaskQueue
|
|||
}
|
||||
}
|
||||
|
||||
// Should always be called after taking giantLock
|
||||
@GuardedBy("giant")
|
||||
private void addTaskInternal(final Task task)
|
||||
{
|
||||
tasks.add(task);
|
||||
taskLockbox.add(task);
|
||||
final Task existingTask = tasks.putIfAbsent(task.getId(), task);
|
||||
|
||||
if (existingTask == null) {
|
||||
taskLockbox.add(task);
|
||||
} else if (!existingTask.equals(task)) {
|
||||
throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId());
|
||||
}
|
||||
}
|
||||
|
||||
// Should always be called after taking giantLock
|
||||
/**
|
||||
* Removes a task from {@link #tasks} and {@link #taskLockbox}, if it exists. Returns whether the task was
|
||||
* removed or not.
|
||||
*/
|
||||
@GuardedBy("giant")
|
||||
private void removeTaskInternal(final Task task)
|
||||
private boolean removeTaskInternal(final String taskId)
|
||||
{
|
||||
taskLockbox.remove(task);
|
||||
tasks.remove(task);
|
||||
final Task task = tasks.remove(taskId);
|
||||
if (task != null) {
|
||||
taskLockbox.remove(task);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -506,12 +535,9 @@ public class TaskQueue
|
|||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskId, "taskId");
|
||||
for (final Task task : tasks) {
|
||||
if (task.getId().equals(taskId)) {
|
||||
notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
|
||||
break;
|
||||
}
|
||||
final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId"));
|
||||
if (task != null) {
|
||||
notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
@ -531,12 +557,9 @@ public class TaskQueue
|
|||
giant.lock();
|
||||
|
||||
try {
|
||||
Preconditions.checkNotNull(taskId, "taskId");
|
||||
for (final Task task : tasks) {
|
||||
if (task.getId().equals(taskId)) {
|
||||
notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args);
|
||||
break;
|
||||
}
|
||||
final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId"));
|
||||
if (task != null) {
|
||||
notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
|
@ -568,62 +591,65 @@ public class TaskQueue
|
|||
taskStatus.getId()
|
||||
);
|
||||
|
||||
// Inform taskRunner that this task can be shut down
|
||||
TaskLocation taskLocation = TaskLocation.unknown();
|
||||
try {
|
||||
taskLocation = taskRunner.getTaskLocation(task.getId());
|
||||
taskRunner.shutdown(task.getId(), reasonFormat, args);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
|
||||
if (!taskStatus.isComplete()) {
|
||||
// Nothing to do for incomplete statuses.
|
||||
return;
|
||||
}
|
||||
|
||||
int removed = 0;
|
||||
|
||||
///////// critical section
|
||||
|
||||
// Critical section: add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up.
|
||||
giant.lock();
|
||||
try {
|
||||
// Remove from running tasks
|
||||
for (int i = tasks.size() - 1; i >= 0; i--) {
|
||||
if (tasks.get(i).getId().equals(task.getId())) {
|
||||
removed++;
|
||||
removeTaskInternal(tasks.get(i));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from futures list
|
||||
taskFutures.remove(task.getId());
|
||||
recentlyCompletedTasks.add(task.getId());
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
|
||||
///////// end critical
|
||||
final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId());
|
||||
|
||||
if (removed == 0) {
|
||||
log.warn("Unknown task completed: %s", task.getId());
|
||||
// Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor
|
||||
// remembers that this task has completed.
|
||||
try {
|
||||
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
|
||||
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
|
||||
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
|
||||
} else {
|
||||
taskStorage.setStatus(taskStatus.withLocation(taskLocation));
|
||||
}
|
||||
}
|
||||
catch (Throwable e) {
|
||||
// If persist fails, even after the retries performed in taskStorage, then metadata store and actual cluster
|
||||
// state have diverged. Send out an alert and continue with the task shutdown routine.
|
||||
log.makeAlert(e, "Failed to persist status for task")
|
||||
.addData("task", task.getId())
|
||||
.addData("statusCode", taskStatus.getStatusCode())
|
||||
.emit();
|
||||
}
|
||||
|
||||
if (removed > 0) {
|
||||
// If we thought this task should be running, save status to DB
|
||||
try {
|
||||
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
|
||||
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
|
||||
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
|
||||
} else {
|
||||
taskStorage.setStatus(taskStatus.withLocation(taskLocation));
|
||||
log.info("Task done: %s", task);
|
||||
requestManagement();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to persist status for task")
|
||||
.addData("task", task.getId())
|
||||
.addData("statusCode", taskStatus.getStatusCode())
|
||||
.emit();
|
||||
// Inform taskRunner that this task can be shut down.
|
||||
try {
|
||||
taskRunner.shutdown(task.getId(), reasonFormat, args);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
// If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to
|
||||
// shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map.
|
||||
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
|
||||
}
|
||||
|
||||
// Critical section: remove this task from all of our tracking data structures.
|
||||
giant.lock();
|
||||
try {
|
||||
if (removeTaskInternal(task.getId())) {
|
||||
taskFutures.remove(task.getId());
|
||||
} else {
|
||||
log.warn("Unknown task completed: %s", task.getId());
|
||||
}
|
||||
|
||||
recentlyCompletedTasks.remove(task.getId());
|
||||
requestManagement();
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -684,7 +710,7 @@ public class TaskQueue
|
|||
log.info(
|
||||
"Task %s: %s (%d run duration)",
|
||||
status.getStatusCode(),
|
||||
task,
|
||||
task.getId(),
|
||||
status.getDuration()
|
||||
);
|
||||
|
||||
|
@ -719,7 +745,7 @@ public class TaskQueue
|
|||
if (active) {
|
||||
final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
|
||||
final int tasksSynced = newTasks.size();
|
||||
final Map<String, Task> oldTasks = toTaskIDMap(tasks);
|
||||
final Map<String, Task> oldTasks = new HashMap<>(tasks);
|
||||
|
||||
// Calculate differences on IDs instead of Task Objects.
|
||||
Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
|
||||
|
@ -732,7 +758,7 @@ public class TaskQueue
|
|||
|
||||
// Clean up removed Tasks
|
||||
for (Task task : removedTasks) {
|
||||
removeTaskInternal(task);
|
||||
removeTaskInternal(task.getId());
|
||||
}
|
||||
|
||||
// Add newly Added tasks to the queue
|
||||
|
@ -800,7 +826,7 @@ public class TaskQueue
|
|||
{
|
||||
giant.lock();
|
||||
try {
|
||||
return tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
|
||||
return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
|
@ -840,7 +866,7 @@ public class TaskQueue
|
|||
|
||||
giant.lock();
|
||||
try {
|
||||
return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
|
||||
return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
|
||||
.collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
|
||||
}
|
||||
finally {
|
||||
|
@ -853,7 +879,7 @@ public class TaskQueue
|
|||
{
|
||||
giant.lock();
|
||||
try {
|
||||
return new ArrayList<Task>(tasks);
|
||||
return new ArrayList<>(tasks.values());
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
|
|
Loading…
Reference in New Issue