diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ac85210aa49..fa2b9218229 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -98,6 +98,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed before registering with RM (Rohith via jlowe) + MAPREDUCE-5570. Map task attempt with fetch failure has incorrect attempt + finish time (Rushabh S Shah via jlowe) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index a16eaffdfad..e13aaea756c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1770,8 +1770,6 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP); //add to diagnostic taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); - //set the finish time - taskAttempt.setFinishTime(); if (taskAttempt.getLaunchTime() != 0) { taskAttempt.eventHandler diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 28babaa2d66..9dc738d858f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -91,7 +92,7 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - + static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -725,6 +726,74 @@ public void testAppDiognosticEventOnNewTask() throws Exception { eventHandler.internalError); } + @Test + public void testFetchFailureAttemptFinishTime() throws Exception{ + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener,mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_DONE)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + + assertTrue("Task Attempt finish time is not greater than 0", + taImpl.getFinishTime() > 0); + + Long finishTime = taImpl.getFinishTime(); + Thread.sleep(5); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + + assertEquals("Task attempt is not in Too Many Fetch Failure state", + taImpl.getState(), TaskAttemptState.FAILED); + + assertEquals("After TA_TOO_MANY_FETCH_FAILURE," + + " Task attempt finish time is not the same ", + finishTime, Long.valueOf(taImpl.getFinishTime())); + } public static class MockEventHandler implements EventHandler { public boolean internalError;