mirror of https://github.com/apache/druid.git
Enforce logging when killing a task (#6621)
* Enforce logging when killing a task * fix test * address comment * address comment
This commit is contained in:
parent
8f3fe9cd02
commit
d738ce4d2a
|
@ -346,7 +346,7 @@ public class MaterializedViewSupervisor implements Supervisor
|
|||
&& !toBuildInterval.get(interval).equals(runningVersion.get(interval))
|
||||
) {
|
||||
if (taskMaster.getTaskQueue().isPresent()) {
|
||||
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId());
|
||||
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId(), "version mismatch");
|
||||
runningTasks.remove(interval);
|
||||
}
|
||||
}
|
||||
|
@ -451,7 +451,7 @@ public class MaterializedViewSupervisor implements Supervisor
|
|||
{
|
||||
for (HadoopIndexTask task : runningTasks.values()) {
|
||||
if (taskMaster.getTaskQueue().isPresent()) {
|
||||
taskMaster.getTaskQueue().get().shutdown(task.getId());
|
||||
taskMaster.getTaskQueue().get().shutdown(task.getId(), "killing all tasks");
|
||||
}
|
||||
}
|
||||
runningTasks.clear();
|
||||
|
|
|
@ -747,7 +747,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
// Reset everything
|
||||
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
|
||||
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
|
||||
taskGroups.values().forEach(this::killTasksInGroup);
|
||||
taskGroups.values().forEach(group -> killTasksInGroup(group, "DataSourceMetadata is not found while reset"));
|
||||
taskGroups.clear();
|
||||
partitionGroups.clear();
|
||||
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
|
||||
|
@ -811,7 +811,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
if (metadataUpdateSuccess) {
|
||||
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
|
||||
final int groupId = getTaskGroupIdForPartition(partition);
|
||||
killTaskGroupForPartitions(ImmutableSet.of(partition));
|
||||
killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset");
|
||||
taskGroups.remove(groupId);
|
||||
partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
|
||||
});
|
||||
|
@ -828,19 +828,18 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
}
|
||||
|
||||
private void killTaskGroupForPartitions(Set<Integer> partitions)
|
||||
private void killTaskGroupForPartitions(Set<Integer> partitions, String reasonFormat, Object... args)
|
||||
{
|
||||
for (Integer partition : partitions) {
|
||||
killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)));
|
||||
killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)), reasonFormat, args);
|
||||
}
|
||||
}
|
||||
|
||||
private void killTasksInGroup(TaskGroup taskGroup)
|
||||
private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, Object... args)
|
||||
{
|
||||
if (taskGroup != null) {
|
||||
for (String taskId : taskGroup.tasks.keySet()) {
|
||||
log.info("Killing task [%s] in the task group", taskId);
|
||||
killTask(taskId);
|
||||
killTask(taskId, reasonFormat, args);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -855,7 +854,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
for (TaskGroup taskGroup : taskGroups.values()) {
|
||||
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
|
||||
killTask(entry.getKey());
|
||||
killTask(entry.getKey(), "Killing task for graceful shutdown");
|
||||
} else {
|
||||
entry.getValue().startTime = DateTimes.EPOCH;
|
||||
}
|
||||
|
@ -1236,8 +1235,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
for (int i = 0; i < results.size(); i++) {
|
||||
if (results.get(i) == null) {
|
||||
String taskId = futureTaskIds.get(i);
|
||||
log.warn("Task [%s] failed to return status, killing task", taskId);
|
||||
killTask(taskId);
|
||||
killTask(taskId, "Task [%s] failed to return status, killing task", taskId);
|
||||
}
|
||||
}
|
||||
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
|
||||
|
@ -1297,7 +1295,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId);
|
||||
killTask(taskId);
|
||||
killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass());
|
||||
taskGroup.tasks.remove(taskId);
|
||||
}
|
||||
} else if (checkpoints.isEmpty()) {
|
||||
|
@ -1393,7 +1391,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
|
||||
taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
|
||||
sequenceCheckpoint -> {
|
||||
log.warn(
|
||||
killTask(
|
||||
sequenceCheckpoint.lhs,
|
||||
"Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest "
|
||||
+ "persisted offsets in metadata store [%s]",
|
||||
sequenceCheckpoint.lhs,
|
||||
|
@ -1401,7 +1400,6 @@ public class KafkaSupervisor implements Supervisor
|
|||
taskGroup.sequenceOffsets,
|
||||
latestOffsetsFromDb
|
||||
);
|
||||
killTask(sequenceCheckpoint.lhs);
|
||||
taskGroup.tasks.remove(sequenceCheckpoint.lhs);
|
||||
}
|
||||
);
|
||||
|
@ -1505,8 +1503,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
// request threw an exception so kill the task
|
||||
if (results.get(i) == null) {
|
||||
String taskId = futureTaskIds.get(i);
|
||||
log.warn("Task [%s] failed to return start time, killing task", taskId);
|
||||
killTask(taskId);
|
||||
killTask(taskId, "Task [%s] failed to return start time, killing task", taskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1553,13 +1550,12 @@ public class KafkaSupervisor implements Supervisor
|
|||
partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
} else {
|
||||
log.warn(
|
||||
"All tasks in group [%s] failed to transition to publishing state, killing tasks [%s]",
|
||||
groupId,
|
||||
group.taskIds()
|
||||
);
|
||||
for (String id : group.taskIds()) {
|
||||
killTask(id);
|
||||
killTask(
|
||||
id,
|
||||
"All tasks in group [%s] failed to transition to publishing state",
|
||||
groupId
|
||||
);
|
||||
}
|
||||
// clear partitionGroups, so that latest offsets from db is used as start offsets not the stale ones
|
||||
// if tasks did some successful incremental handoffs
|
||||
|
@ -1589,7 +1585,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if
|
||||
// publishing failed and we need to re-ingest)
|
||||
return Futures.transform(
|
||||
stopTasksInGroup(taskGroup),
|
||||
stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", task.status.getId()),
|
||||
new Function<Object, Map<Integer, Long>>()
|
||||
{
|
||||
@Nullable
|
||||
|
@ -1604,8 +1600,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
|
||||
if (task.status.isRunnable()) {
|
||||
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
|
||||
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
|
||||
killTask(taskId);
|
||||
killTask(taskId, "Killing task [%s] which hasn't been assigned to a worker", taskId);
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
|
@ -1634,8 +1629,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
|
||||
if (result == null || result.isEmpty()) { // kill tasks that didn't return a value
|
||||
String taskId = pauseTaskIds.get(i);
|
||||
log.warn("Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId);
|
||||
killTask(taskId);
|
||||
killTask(taskId, "Task [%s] failed to respond to [pause] in a timely manner, killing task", taskId);
|
||||
taskGroup.tasks.remove(taskId);
|
||||
|
||||
} else { // otherwise build a map of the highest offsets seen
|
||||
|
@ -1683,8 +1677,11 @@ public class KafkaSupervisor implements Supervisor
|
|||
for (int i = 0; i < results.size(); i++) {
|
||||
if (results.get(i) == null || !results.get(i)) {
|
||||
String taskId = setEndOffsetTaskIds.get(i);
|
||||
log.warn("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", taskId);
|
||||
killTask(taskId);
|
||||
killTask(
|
||||
taskId,
|
||||
"Task [%s] failed to respond to [set end offsets] in a timely manner, killing task",
|
||||
taskId
|
||||
);
|
||||
taskGroup.tasks.remove(taskId);
|
||||
}
|
||||
}
|
||||
|
@ -1730,7 +1727,12 @@ public class KafkaSupervisor implements Supervisor
|
|||
if (stopTasksInTaskGroup) {
|
||||
// One of the earlier groups that was handling the same partition set timed out before the segments were
|
||||
// published so stop any additional groups handling the same partition set that are pending completion.
|
||||
futures.add(stopTasksInGroup(group));
|
||||
futures.add(
|
||||
stopTasksInGroup(
|
||||
group,
|
||||
"one of earlier groups that was handling the same partition set timed out before publishing segments"
|
||||
)
|
||||
);
|
||||
toRemove.add(group);
|
||||
continue;
|
||||
}
|
||||
|
@ -1755,8 +1757,9 @@ public class KafkaSupervisor implements Supervisor
|
|||
if (taskData.status.isSuccess()) {
|
||||
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
|
||||
// we no longer need them to publish their segment.
|
||||
log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
|
||||
futures.add(stopTasksInGroup(group));
|
||||
futures.add(
|
||||
stopTasksInGroup(group, "Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds())
|
||||
);
|
||||
foundSuccess = true;
|
||||
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
|
||||
break; // skip iterating the rest of the tasks in this group as they've all been stopped now
|
||||
|
@ -1778,12 +1781,20 @@ public class KafkaSupervisor implements Supervisor
|
|||
// reset partitions offsets for this task group so that they will be re-read from metadata storage
|
||||
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
|
||||
// kill all the tasks in this pending completion group
|
||||
killTasksInGroup(group);
|
||||
killTasksInGroup(
|
||||
group,
|
||||
"No task in pending completion taskGroup[%d] succeeded before completion timeout elapsed",
|
||||
groupId
|
||||
);
|
||||
// set a flag so the other pending completion groups for this set of partitions will also stop
|
||||
stopTasksInTaskGroup = true;
|
||||
|
||||
// kill all the tasks in the currently reading task group and remove the bad task group
|
||||
killTasksInGroup(taskGroups.remove(groupId));
|
||||
killTasksInGroup(
|
||||
taskGroups.remove(groupId),
|
||||
"No task in the corresponding pending completion taskGroup[%d] succeeded before completion timeout elapsed",
|
||||
groupId
|
||||
);
|
||||
toRemove.add(group);
|
||||
}
|
||||
}
|
||||
|
@ -1837,7 +1848,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
// check for successful tasks, and if we find one, stop all tasks in the group and remove the group so it can
|
||||
// be recreated with the next set of offsets
|
||||
if (taskData.status.isSuccess()) {
|
||||
futures.add(stopTasksInGroup(taskGroup));
|
||||
futures.add(stopTasksInGroup(taskGroup, "task[%s] succeeded in the same taskGroup", taskData.status.getId()));
|
||||
iTaskGroups.remove();
|
||||
break;
|
||||
}
|
||||
|
@ -2099,18 +2110,24 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
|
||||
private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup, String stopReasonFormat, Object... args)
|
||||
{
|
||||
if (taskGroup == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Stopping all tasks in taskGroup[%s] because: [%s]",
|
||||
taskGroup.groupId,
|
||||
StringUtils.format(stopReasonFormat, args)
|
||||
);
|
||||
|
||||
final List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
final String taskId = entry.getKey();
|
||||
final TaskData taskData = entry.getValue();
|
||||
if (taskData.status == null) {
|
||||
killTask(taskId);
|
||||
killTask(taskId, "Killing task since task status is not known to supervisor");
|
||||
} else if (!taskData.status.isComplete()) {
|
||||
futures.add(stopTask(taskId, false));
|
||||
}
|
||||
|
@ -2129,8 +2146,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
public Void apply(@Nullable Boolean result)
|
||||
{
|
||||
if (result == null || !result) {
|
||||
log.info("Task [%s] failed to stop in a timely manner, killing task", id);
|
||||
killTask(id);
|
||||
killTask(id, "Task [%s] failed to stop in a timely manner, killing task", id);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -2138,11 +2154,11 @@ public class KafkaSupervisor implements Supervisor
|
|||
);
|
||||
}
|
||||
|
||||
private void killTask(final String id)
|
||||
private void killTask(final String id, String reasonFormat, Object... args)
|
||||
{
|
||||
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
|
||||
if (taskQueue.isPresent()) {
|
||||
taskQueue.get().shutdown(id);
|
||||
taskQueue.get().shutdown(id, reasonFormat, args);
|
||||
} else {
|
||||
log.error("Failed to get task queue because I'm not the leader!");
|
||||
}
|
||||
|
|
|
@ -654,7 +654,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
|
||||
expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
|
||||
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
|
||||
taskQueue.shutdown("id3");
|
||||
taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3");
|
||||
|
||||
expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
|
||||
|
||||
|
@ -763,8 +763,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
.times(1);
|
||||
|
||||
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
|
||||
taskQueue.shutdown("id4");
|
||||
taskQueue.shutdown("id5");
|
||||
taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, killing task", "id4");
|
||||
taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, killing task", "id5");
|
||||
replayAll();
|
||||
|
||||
supervisor.start();
|
||||
|
@ -1464,7 +1464,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED));
|
||||
expect(taskClient.getStartTimeAsync(task.getId()))
|
||||
.andReturn(Futures.immediateFailedFuture(new RuntimeException()));
|
||||
taskQueue.shutdown(task.getId());
|
||||
taskQueue.shutdown(task.getId(), "Task [%s] failed to return start time, killing task", task.getId());
|
||||
}
|
||||
replay(taskStorage, taskClient, taskQueue);
|
||||
|
||||
|
@ -1535,7 +1535,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
.times(2);
|
||||
expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
|
||||
.andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
|
||||
taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
|
||||
taskQueue.shutdown(
|
||||
EasyMock.contains("sequenceName-0"),
|
||||
EasyMock.eq("Task [%s] failed to respond to [pause] in a timely manner, killing task"),
|
||||
EasyMock.contains("sequenceName-0")
|
||||
);
|
||||
expectLastCall().times(2);
|
||||
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
|
||||
|
||||
|
@ -1622,7 +1626,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
EasyMock.eq(true)
|
||||
)
|
||||
).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
|
||||
taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
|
||||
taskQueue.shutdown(
|
||||
EasyMock.contains("sequenceName-0"),
|
||||
EasyMock.eq("All tasks in group [%s] failed to transition to publishing state"),
|
||||
EasyMock.eq(0)
|
||||
);
|
||||
expectLastCall().times(2);
|
||||
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
|
||||
|
||||
|
@ -1749,8 +1757,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
|
||||
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
|
||||
.andReturn(Futures.immediateFuture(true));
|
||||
taskQueue.shutdown("id3");
|
||||
expectLastCall().times(2);
|
||||
taskQueue.shutdown("id3", "Killing task for graceful shutdown");
|
||||
expectLastCall().times(1);
|
||||
taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3");
|
||||
expectLastCall().times(1);
|
||||
|
||||
replay(taskRunner, taskClient, taskQueue);
|
||||
|
||||
|
@ -1950,8 +1960,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
|
||||
reset(taskQueue, indexerMetadataStorageCoordinator);
|
||||
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
|
||||
taskQueue.shutdown("id2");
|
||||
taskQueue.shutdown("id3");
|
||||
taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
|
||||
taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
|
||||
replay(taskQueue, indexerMetadataStorageCoordinator);
|
||||
|
||||
supervisor.resetInternal(null);
|
||||
|
@ -2036,9 +2046,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
|
||||
reset(taskQueue, indexerMetadataStorageCoordinator);
|
||||
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
|
||||
taskQueue.shutdown("id1");
|
||||
taskQueue.shutdown("id2");
|
||||
taskQueue.shutdown("id3");
|
||||
taskQueue.shutdown("id1", "DataSourceMetadata is not found while reset");
|
||||
taskQueue.shutdown("id2", "DataSourceMetadata is not found while reset");
|
||||
taskQueue.shutdown("id3", "DataSourceMetadata is not found while reset");
|
||||
replay(taskQueue, indexerMetadataStorageCoordinator);
|
||||
|
||||
supervisor.resetInternal(null);
|
||||
|
@ -2424,8 +2434,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
|
||||
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
|
||||
.andReturn(Futures.immediateFuture(true));
|
||||
taskQueue.shutdown("id3");
|
||||
expectLastCall().times(2);
|
||||
taskQueue.shutdown("id3", "Killing task for graceful shutdown");
|
||||
expectLastCall().times(1);
|
||||
taskQueue.shutdown("id3", "Killing task [%s] which hasn't been assigned to a worker", "id3");
|
||||
expectLastCall().times(1);
|
||||
|
||||
replayAll();
|
||||
supervisor.start();
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<Configuration status="WARN">
|
||||
<Appenders>
|
||||
<Console name="Console" target="SYSTEM_OUT">
|
||||
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
|
||||
</Console>
|
||||
</Appenders>
|
||||
<Loggers>
|
||||
<Root level="info">
|
||||
<AppenderRef ref="Console"/>
|
||||
</Root>
|
||||
<Logger level="debug" name="org.apache.druid" additivity="false">
|
||||
<AppenderRef ref="Console"/>
|
||||
</Logger>
|
||||
</Loggers>
|
||||
</Configuration>
|
|
@ -583,8 +583,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(final String taskid)
|
||||
public void shutdown(final String taskid, String reason)
|
||||
{
|
||||
log.info("Shutdown [%s] because: [%s]", taskid, reason);
|
||||
final ForkingTaskRunnerWorkItem taskInfo;
|
||||
|
||||
synchronized (tasks) {
|
||||
|
|
|
@ -541,8 +541,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
* @param taskId - task id to shutdown
|
||||
*/
|
||||
@Override
|
||||
public void shutdown(final String taskId)
|
||||
public void shutdown(final String taskId, String reason)
|
||||
{
|
||||
log.info("Shutdown [%s] because: [%s]", taskId, reason);
|
||||
if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
|
||||
log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
|
||||
} else if (pendingTasks.remove(taskId) != null) {
|
||||
|
|
|
@ -278,8 +278,9 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
|
|||
* @param taskid task ID to clean up resources for
|
||||
*/
|
||||
@Override
|
||||
public void shutdown(final String taskid)
|
||||
public void shutdown(final String taskid, String reason)
|
||||
{
|
||||
log.info("Shutdown [%s] because: [%s]", taskid, reason);
|
||||
if (runningItem != null && runningItem.getTask().getId().equals(taskid)) {
|
||||
runningItem.getResult().cancel(true);
|
||||
}
|
||||
|
|
|
@ -252,7 +252,7 @@ public class TaskQueue
|
|||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
|
||||
notifyStatus(task, TaskStatus.failure(task.getId()));
|
||||
notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());
|
||||
continue;
|
||||
}
|
||||
if (taskIsReady) {
|
||||
|
@ -286,7 +286,11 @@ public class TaskQueue
|
|||
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
|
||||
for (final String taskId : tasksToKill) {
|
||||
try {
|
||||
taskRunner.shutdown(taskId);
|
||||
taskRunner.shutdown(
|
||||
taskId,
|
||||
"task is not in runnerTaskFutures[%s]",
|
||||
runnerTaskFutures.keySet()
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
|
||||
|
@ -356,7 +360,7 @@ public class TaskQueue
|
|||
*
|
||||
* @param taskId task to kill
|
||||
*/
|
||||
public void shutdown(final String taskId)
|
||||
public void shutdown(final String taskId, String reasonFormat, Object... args)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
|
@ -364,7 +368,7 @@ public class TaskQueue
|
|||
Preconditions.checkNotNull(taskId, "taskId");
|
||||
for (final Task task : tasks) {
|
||||
if (task.getId().equals(taskId)) {
|
||||
notifyStatus(task, TaskStatus.failure(taskId));
|
||||
notifyStatus(task, TaskStatus.failure(taskId), reasonFormat, args);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -386,7 +390,7 @@ public class TaskQueue
|
|||
* @throws IllegalArgumentException if the task ID does not match the status ID
|
||||
* @throws IllegalStateException if this queue is currently shut down
|
||||
*/
|
||||
private void notifyStatus(final Task task, final TaskStatus taskStatus)
|
||||
private void notifyStatus(final Task task, final TaskStatus taskStatus, String reasonFormat, Object... args)
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
|
@ -402,7 +406,7 @@ public class TaskQueue
|
|||
);
|
||||
// Inform taskRunner that this task can be shut down
|
||||
try {
|
||||
taskRunner.shutdown(task.getId());
|
||||
taskRunner.shutdown(task.getId(), reasonFormat, args);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
|
||||
|
@ -493,7 +497,7 @@ public class TaskQueue
|
|||
return;
|
||||
}
|
||||
|
||||
notifyStatus(task, status);
|
||||
notifyStatus(task, status, "notified status change from task");
|
||||
|
||||
// Emit event and log, if the task is done
|
||||
if (status.isComplete()) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.druid.indexer.TaskStatus;
|
|||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collection;
|
||||
|
@ -81,8 +82,14 @@ public interface TaskRunner
|
|||
* currently-running tasks.
|
||||
*
|
||||
* @param taskid task ID to clean up resources for
|
||||
* @param reason reason to kill this task
|
||||
*/
|
||||
void shutdown(String taskid);
|
||||
void shutdown(String taskid, String reason);
|
||||
|
||||
default void shutdown(String taskid, String reasonFormat, Object... args)
|
||||
{
|
||||
shutdown(taskid, StringUtils.format(reasonFormat, args));
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling
|
||||
|
|
|
@ -1041,7 +1041,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(String taskId)
|
||||
public void shutdown(String taskId, String reason)
|
||||
{
|
||||
if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
|
||||
log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
|
||||
|
@ -1050,6 +1050,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
|
||||
WorkerHolder workerHolderRunningTask = null;
|
||||
synchronized (statusLock) {
|
||||
log.info("Shutdown [%s] because: [%s]", taskId, reason);
|
||||
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.remove(taskId);
|
||||
if (taskRunnerWorkItem != null) {
|
||||
if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
|
||||
|
|
|
@ -330,7 +330,7 @@ public class OverlordResource
|
|||
@Override
|
||||
public Response apply(TaskQueue taskQueue)
|
||||
{
|
||||
taskQueue.shutdown(taskid);
|
||||
taskQueue.shutdown(taskid, "Shutdown request from user");
|
||||
return Response.ok(ImmutableMap.of("task", taskid)).build();
|
||||
}
|
||||
}
|
||||
|
@ -351,7 +351,7 @@ public class OverlordResource
|
|||
{
|
||||
final List<TaskInfo<Task, TaskStatus>> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
|
||||
for (final TaskInfo<Task, TaskStatus> task : tasks) {
|
||||
taskQueue.shutdown(task.getId());
|
||||
taskQueue.shutdown(task.getId(), "Shutdown request from user");
|
||||
}
|
||||
return Response.ok(ImmutableMap.of("dataSource", dataSource)).build();
|
||||
}
|
||||
|
|
|
@ -168,7 +168,7 @@ public class WorkerResource
|
|||
public Response doShutdown(@PathParam("taskid") String taskid)
|
||||
{
|
||||
try {
|
||||
taskRunner.shutdown(taskid);
|
||||
taskRunner.shutdown(taskid, "shut down request via HTTP endpoint");
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Failed to issue shutdown for task: %s", taskid);
|
||||
|
|
|
@ -101,19 +101,19 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
|
|||
|
||||
if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())
|
||||
&& remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())) {
|
||||
remoteTaskRunner.shutdown("task4");
|
||||
remoteTaskRunner.shutdown("task4", "test");
|
||||
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]);
|
||||
Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode());
|
||||
Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode());
|
||||
} else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())
|
||||
&& remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())) {
|
||||
remoteTaskRunner.shutdown("task2");
|
||||
remoteTaskRunner.shutdown("task2", "test");
|
||||
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]);
|
||||
Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode());
|
||||
Assert.assertEquals(TaskState.SUCCESS, results[3].get().getStatusCode());
|
||||
} else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())
|
||||
&& remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())) {
|
||||
remoteTaskRunner.shutdown("task3");
|
||||
remoteTaskRunner.shutdown("task3", "test");
|
||||
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]);
|
||||
Assert.assertEquals(TaskState.SUCCESS, results[4].get().getStatusCode());
|
||||
Assert.assertEquals(TaskState.SUCCESS, results[2].get().getStatusCode());
|
||||
|
|
|
@ -236,7 +236,7 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
|
|||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(final String taskid)
|
||||
public void shutdown(final String taskid, String reason)
|
||||
{
|
||||
for (final TaskRunnerWorkItem runningItem : runningItems) {
|
||||
if (runningItem.getTaskId().equals(taskid)) {
|
||||
|
|
|
@ -408,7 +408,7 @@ public class OverlordTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(String taskid) {}
|
||||
public void shutdown(String taskid, String reason) {}
|
||||
|
||||
@Override
|
||||
public synchronized Collection<? extends TaskRunnerWorkItem> getRunningTasks()
|
||||
|
|
Loading…
Reference in New Issue