MAPREDUCE-5900. Changed to the interpret container preemption exit code as a task attempt killing event. Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1607512 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1607513 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4eb15fa356
commit
38bb24538d
|
@ -131,6 +131,9 @@ Release 2.5.0 - UNRELEASED
|
|||
MAPREDUCE-5939. StartTime showing up as the epoch time in JHS UI after
|
||||
upgrade (Chen He via jlowe)
|
||||
|
||||
MAPREDUCE-5900. Changed to the interpret container preemption exit code as a
|
||||
task attempt killing event. (Mayank Bansal via zjshen)
|
||||
|
||||
Release 2.4.1 - 2014-06-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -699,7 +699,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
@VisibleForTesting
|
||||
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
|
||||
TaskAttemptId attemptID) {
|
||||
if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
|
||||
if (cont.getExitStatus() == ContainerExitStatus.ABORTED
|
||||
|| cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
|
||||
// killed by framework
|
||||
return new TaskAttemptEvent(attemptID,
|
||||
TaskAttemptEventType.TA_KILL);
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||
|
@ -795,6 +796,178 @@ public class TestTaskAttempt{
|
|||
finishTime, Long.valueOf(taImpl.getFinishTime()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerKillAfterAssigned() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 2);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||
0);
|
||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||
Path jobFile = mock(Path.class);
|
||||
|
||||
MockEventHandler eventHandler = new MockEventHandler();
|
||||
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||
when(taListener.getAddress()).thenReturn(
|
||||
new InetSocketAddress("localhost", 0));
|
||||
|
||||
JobConf jobConf = new JobConf();
|
||||
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
||||
|
||||
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
||||
when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
|
||||
|
||||
AppContext appCtx = mock(AppContext.class);
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
|
||||
jobFile, 1, splits, jobConf, taListener, new Token(),
|
||||
new Credentials(), new SystemClock(), appCtx);
|
||||
|
||||
NodeId nid = NodeId.newInstance("127.0.0.2", 0);
|
||||
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
|
||||
Container container = mock(Container.class);
|
||||
when(container.getId()).thenReturn(contId);
|
||||
when(container.getNodeId()).thenReturn(nid);
|
||||
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
|
||||
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_SCHEDULE));
|
||||
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
|
||||
mock(Map.class)));
|
||||
assertEquals("Task attempt is not in assinged state",
|
||||
taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
assertEquals("Task should be in KILLED state",
|
||||
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
||||
taImpl.getInternalState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerKillWhileRunning() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 2);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||
0);
|
||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||
Path jobFile = mock(Path.class);
|
||||
|
||||
MockEventHandler eventHandler = new MockEventHandler();
|
||||
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||
when(taListener.getAddress()).thenReturn(
|
||||
new InetSocketAddress("localhost", 0));
|
||||
|
||||
JobConf jobConf = new JobConf();
|
||||
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
||||
|
||||
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
||||
when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
|
||||
|
||||
AppContext appCtx = mock(AppContext.class);
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
|
||||
jobFile, 1, splits, jobConf, taListener, new Token(),
|
||||
new Credentials(), new SystemClock(), appCtx);
|
||||
|
||||
NodeId nid = NodeId.newInstance("127.0.0.2", 0);
|
||||
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
|
||||
Container container = mock(Container.class);
|
||||
when(container.getId()).thenReturn(contId);
|
||||
when(container.getNodeId()).thenReturn(nid);
|
||||
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
|
||||
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_SCHEDULE));
|
||||
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
|
||||
mock(Map.class)));
|
||||
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
|
||||
assertEquals("Task attempt is not in running state", taImpl.getState(),
|
||||
TaskAttemptState.RUNNING);
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
assertFalse("InternalError occurred trying to handle TA_KILL",
|
||||
eventHandler.internalError);
|
||||
assertEquals("Task should be in KILLED state",
|
||||
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
||||
taImpl.getInternalState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerKillWhileCommitPending() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 2);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||
0);
|
||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||
Path jobFile = mock(Path.class);
|
||||
|
||||
MockEventHandler eventHandler = new MockEventHandler();
|
||||
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||
when(taListener.getAddress()).thenReturn(
|
||||
new InetSocketAddress("localhost", 0));
|
||||
|
||||
JobConf jobConf = new JobConf();
|
||||
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
||||
|
||||
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
||||
when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
|
||||
|
||||
AppContext appCtx = mock(AppContext.class);
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
|
||||
jobFile, 1, splits, jobConf, taListener, new Token(),
|
||||
new Credentials(), new SystemClock(), appCtx);
|
||||
|
||||
NodeId nid = NodeId.newInstance("127.0.0.2", 0);
|
||||
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
|
||||
Container container = mock(Container.class);
|
||||
when(container.getId()).thenReturn(contId);
|
||||
when(container.getNodeId()).thenReturn(nid);
|
||||
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
|
||||
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_SCHEDULE));
|
||||
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
|
||||
mock(Map.class)));
|
||||
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
|
||||
assertEquals("Task attempt is not in running state", taImpl.getState(),
|
||||
TaskAttemptState.RUNNING);
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_COMMIT_PENDING));
|
||||
assertEquals("Task should be in COMMIT_PENDING state",
|
||||
TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
|
||||
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
assertFalse("InternalError occurred trying to handle TA_KILL",
|
||||
eventHandler.internalError);
|
||||
assertEquals("Task should be in KILLED state",
|
||||
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
||||
taImpl.getInternalState());
|
||||
}
|
||||
|
||||
public static class MockEventHandler implements EventHandler {
|
||||
public boolean internalError;
|
||||
|
||||
|
|
|
@ -1954,6 +1954,22 @@ public class TestRMContainerAllocator {
|
|||
TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
|
||||
abortedStatus, attemptId);
|
||||
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
|
||||
|
||||
ContainerId containerId2 = ContainerId.newInstance(applicationAttemptId, 2);
|
||||
ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
|
||||
ContainerState.RUNNING, "", 0);
|
||||
|
||||
ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2,
|
||||
ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED);
|
||||
|
||||
TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2,
|
||||
attemptId);
|
||||
Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
||||
event2.getType());
|
||||
|
||||
TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent(
|
||||
preemptedStatus, attemptId);
|
||||
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue