MAPREDUCE-7205. Treat container scheduler kill exit code as a task attempt killing event.

This closes #821

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 67f9a7b165)
This commit is contained in:
Wanqiang Ji 2019-05-15 19:54:41 +08:00 committed by Akira Ajisaka
parent a4f425d3ae
commit e4011e6886
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
2 changed files with 34 additions and 8 deletions

View File

@ -968,16 +968,20 @@ public class RMContainerAllocator extends RMContainerRequestor
@VisibleForTesting @VisibleForTesting
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
TaskAttemptId attemptID) { TaskAttemptId attemptId) {
if (cont.getExitStatus() == ContainerExitStatus.ABORTED TaskAttemptEvent event;
|| cont.getExitStatus() == ContainerExitStatus.PREEMPTED) { switch (cont.getExitStatus()) {
// killed by framework case ContainerExitStatus.ABORTED:
return new TaskAttemptEvent(attemptID, case ContainerExitStatus.PREEMPTED:
TaskAttemptEventType.TA_KILL); case ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER:
} else { // killed by YARN
return new TaskAttemptEvent(attemptID, event = new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL);
break;
default:
event = new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_COMPLETED); TaskAttemptEventType.TA_CONTAINER_COMPLETED);
} }
return event;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -2430,6 +2430,8 @@ public class TestRMContainerAllocator {
ApplicationId applicationId = ApplicationId.newInstance(1, 1); ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 1); ApplicationAttemptId.newInstance(applicationId, 1);
// ABORTED
ContainerId containerId = ContainerId containerId =
ContainerId.newContainerId(applicationAttemptId, 1); ContainerId.newContainerId(applicationAttemptId, 1);
ContainerStatus status = ContainerStatus.newInstance( ContainerStatus status = ContainerStatus.newInstance(
@ -2448,6 +2450,7 @@ public class TestRMContainerAllocator {
abortedStatus, attemptId); abortedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
// PREEMPTED
ContainerId containerId2 = ContainerId containerId2 =
ContainerId.newContainerId(applicationAttemptId, 2); ContainerId.newContainerId(applicationAttemptId, 2);
ContainerStatus status2 = ContainerStatus.newInstance(containerId2, ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
@ -2464,6 +2467,25 @@ public class TestRMContainerAllocator {
TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent( TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent(
preemptedStatus, attemptId); preemptedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
// KILLED_BY_CONTAINER_SCHEDULER
ContainerId containerId3 =
ContainerId.newContainerId(applicationAttemptId, 3);
ContainerStatus status3 = ContainerStatus.newInstance(containerId3,
ContainerState.RUNNING, "", 0);
ContainerStatus killedByContainerSchedulerStatus =
ContainerStatus.newInstance(containerId3, ContainerState.RUNNING, "",
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER);
TaskAttemptEvent event3 = allocator.createContainerFinishedEvent(status3,
attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
event3.getType());
TaskAttemptEvent abortedEvent3 = allocator.createContainerFinishedEvent(
killedByContainerSchedulerStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent3.getType());
} }
@Test @Test