MAPREDUCE-7205. Treat container scheduler kill exit code as a task attempt killing event.

This closes #821

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
Wanqiang Ji 2019-05-15 19:54:41 +08:00 committed by Akira Ajisaka
parent 9dff6eff81
commit 67f9a7b165
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
2 changed files with 34 additions and 8 deletions

View File

@ -968,16 +968,20 @@ public class RMContainerAllocator extends RMContainerRequestor
@VisibleForTesting
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
TaskAttemptId attemptID) {
if (cont.getExitStatus() == ContainerExitStatus.ABORTED
|| cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
// killed by framework
return new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_KILL);
} else {
return new TaskAttemptEvent(attemptID,
TaskAttemptId attemptId) {
TaskAttemptEvent event;
switch (cont.getExitStatus()) {
case ContainerExitStatus.ABORTED:
case ContainerExitStatus.PREEMPTED:
case ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER:
// killed by YARN
event = new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL);
break;
default:
event = new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_COMPLETED);
}
return event;
}
@SuppressWarnings("unchecked")

View File

@ -2430,6 +2430,8 @@ public class TestRMContainerAllocator {
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 1);
// ABORTED
ContainerId containerId =
ContainerId.newContainerId(applicationAttemptId, 1);
ContainerStatus status = ContainerStatus.newInstance(
@ -2448,6 +2450,7 @@ public class TestRMContainerAllocator {
abortedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
// PREEMPTED
ContainerId containerId2 =
ContainerId.newContainerId(applicationAttemptId, 2);
ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
@ -2464,6 +2467,25 @@ public class TestRMContainerAllocator {
TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent(
preemptedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
// KILLED_BY_CONTAINER_SCHEDULER
ContainerId containerId3 =
ContainerId.newContainerId(applicationAttemptId, 3);
ContainerStatus status3 = ContainerStatus.newInstance(containerId3,
ContainerState.RUNNING, "", 0);
ContainerStatus killedByContainerSchedulerStatus =
ContainerStatus.newInstance(containerId3, ContainerState.RUNNING, "",
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER);
TaskAttemptEvent event3 = allocator.createContainerFinishedEvent(status3,
attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
event3.getType());
TaskAttemptEvent abortedEvent3 = allocator.createContainerFinishedEvent(
killedByContainerSchedulerStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent3.getType());
}
@Test