From 66d468eece95fc1987b43f6d9e01847582c476ec Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 31 Jul 2012 20:53:45 +0000 Subject: [PATCH] merge -r 1367770:1367771 from trunk. FIXES: MAPREDUCE-4457 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1367772 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/JobImpl.java | 3 +- .../v2/app/job/impl/TaskAttemptImpl.java | 3 +- .../v2/app/job/impl/TestTaskAttempt.java | 67 +++++++++++++++++++ 4 files changed, 74 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1dfde6c54d3..01fc15d0c62 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -655,6 +655,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4492. Configuring total queue capacity between 100.5 and 99.5 at perticular level is sucessfull (Mayank Bansal via bobby) + MAPREDUCE-4457. mr job invalid transition TA_TOO_MANY_FETCH_FAILURE at + FAILED (Robert Evans via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 982c89c4c94..7710bc55d11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1370,7 +1370,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } - float failureRate = (float) fetchFailures / runningReduceTasks; + float failureRate = runningReduceTasks == 0 ? 1.0f : + (float) fetchFailures / runningReduceTasks; // declare faulty if fetch-failures >= max-allowed-failures boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 66d48b69042..25f14e58e30 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -435,7 +435,8 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, - TaskAttemptEventType.TA_FAILMSG)) + TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)) // Transitions from KILLED state .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 94c4f20bac1..3eff0c1cfe2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -565,6 +565,73 @@ public class TestTaskAttempt{ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", eventHandler.internalError); } + + @Test + public void testDoubleTooManyFetchFailure() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(OutputCommitter.class), mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_DONE)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + assertEquals("Task attempt is not in FAILED state", taImpl.getState(), + TaskAttemptState.FAILED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(), + TaskAttemptState.FAILED); + assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", + eventHandler.internalError); + } public static class MockEventHandler implements EventHandler { public boolean internalError;