diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a6ace4c3380..51170c86bc2 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -131,6 +131,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5939. StartTime showing up as the epoch time in JHS UI after upgrade (Chen He via jlowe) + MAPREDUCE-5900. Changed to the interpret container preemption exit code as a + task attempt killing event. (Mayank Bansal via zjshen) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 6851584c525..9b028e47b43 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -699,7 +699,8 @@ public class RMContainerAllocator extends RMContainerRequestor @VisibleForTesting public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID) { - if (cont.getExitStatus() == ContainerExitStatus.ABORTED) { + if (cont.getExitStatus() == ContainerExitStatus.ABORTED + || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) { // killed by framework return new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_KILL); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 0cd0b000736..4aa11ed3ecf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; @@ -795,6 +796,178 @@ public class TestTaskAttempt{ finishTime, Long.valueOf(taImpl.getFinishTime())); } + @Test + public void testContainerKillAfterAssigned() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + assertEquals("Task attempt is not in assinged state", + taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + + @Test + public void testContainerKillWhileRunning() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertFalse("InternalError occurred trying to handle TA_KILL", + eventHandler.internalError); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + + @Test + public void testContainerKillWhileCommitPending() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_COMMIT_PENDING)); + assertEquals("Task should be in COMMIT_PENDING state", + TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState()); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertFalse("InternalError occurred trying to handle TA_KILL", + eventHandler.internalError); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + public static class MockEventHandler implements EventHandler { public boolean internalError; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index f669a079c13..baecc7e8d33 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1954,6 +1954,22 @@ public class TestRMContainerAllocator { TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( abortedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); + + ContainerId containerId2 = ContainerId.newInstance(applicationAttemptId, 2); + ContainerStatus status2 = ContainerStatus.newInstance(containerId2, + ContainerState.RUNNING, "", 0); + + ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2, + ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED); + + TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2, + attemptId); + Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, + event2.getType()); + + TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent( + preemptedStatus, attemptId); + Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); } @Test