mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Fix shutdown reason for unknown tasks in taskQueue (#9954)
* Fix shutdown reason for unknown tasks in taskQueue * unused imports
This commit is contained in:
parent
3dfd7c30c0
commit
474f6fc99b
@ -19,12 +19,9 @@
|
||||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
@ -284,30 +281,19 @@ public class TaskQueue
|
||||
}
|
||||
}
|
||||
// Kill tasks that shouldn't be running
|
||||
final Set<String> tasksToKill = Sets.difference(
|
||||
runnerTaskFutures.keySet(),
|
||||
ImmutableSet.copyOf(
|
||||
Lists.transform(
|
||||
tasks,
|
||||
new Function<Task, Object>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Task task)
|
||||
{
|
||||
return task.getId();
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
final Set<String> knownTaskIds = tasks
|
||||
.stream()
|
||||
.map(Task::getId)
|
||||
.collect(Collectors.toSet());
|
||||
final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
|
||||
if (!tasksToKill.isEmpty()) {
|
||||
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
|
||||
for (final String taskId : tasksToKill) {
|
||||
try {
|
||||
taskRunner.shutdown(
|
||||
taskId,
|
||||
"task is not in runnerTaskFutures[%s]",
|
||||
runnerTaskFutures.keySet()
|
||||
"task is not in knownTaskIds[%s]",
|
||||
knownTaskIds
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user