From 0f2ce45aeb0ea391b5f2b357afc4493cf7fa9bc3 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Thu, 5 Dec 2013 16:30:12 +0000 Subject: [PATCH] MAPREDUCE-5409. MRAppMaster throws InvalidStateTransitonException: Invalid event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl. Contributed by Gera Shegalov git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1548197 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 ++ .../v2/app/job/impl/TaskAttemptImpl.java | 40 +++++------ .../v2/app/job/impl/TestTaskAttempt.java | 68 +++++++++++++++++++ 3 files changed, 89 insertions(+), 23 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fba63583089..42810733e7b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -280,6 +280,10 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5451. MR uses LD_LIBRARY_PATH which doesn't mean anything in Windows. (Yingda Chen via cnauroth) + MAPREDUCE-5409. MRAppMaster throws InvalidStateTransitonException: Invalid + event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl (Gera + Shegalov via jlowe) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index ce880f8a699..f3c62a48d5e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -192,6 +192,21 @@ public abstract class TaskAttemptImpl implements DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION = new DiagnosticInformationUpdater(); + private static final EnumSet + FAILED_KILLED_STATE_IGNORED_EVENTS = EnumSet.of( + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_ASSIGNED, + TaskAttemptEventType.TA_CONTAINER_COMPLETED, + TaskAttemptEventType.TA_UPDATE, + // Container launch events can arrive late + TaskAttemptEventType.TA_CONTAINER_LAUNCHED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, + TaskAttemptEventType.TA_COMMIT_PENDING, + TaskAttemptEventType.TA_DONE, + TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE); + private static final StateMachineFactory stateMachineFactory @@ -452,18 +467,7 @@ public abstract class TaskAttemptImpl implements DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events for FAILED state .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, - EnumSet.of(TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_ASSIGNED, - TaskAttemptEventType.TA_CONTAINER_COMPLETED, - TaskAttemptEventType.TA_UPDATE, - // Container launch events can arrive late - TaskAttemptEventType.TA_CONTAINER_LAUNCHED, - TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, - TaskAttemptEventType.TA_CONTAINER_CLEANED, - TaskAttemptEventType.TA_COMMIT_PENDING, - TaskAttemptEventType.TA_DONE, - TaskAttemptEventType.TA_FAILMSG, - TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)) + FAILED_KILLED_STATE_IGNORED_EVENTS) // Transitions from KILLED state .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, @@ -471,17 +475,7 @@ public abstract class TaskAttemptImpl implements DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) // Ignore-able events for KILLED state .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, - EnumSet.of(TaskAttemptEventType.TA_KILL, - TaskAttemptEventType.TA_ASSIGNED, - TaskAttemptEventType.TA_CONTAINER_COMPLETED, - TaskAttemptEventType.TA_UPDATE, - // Container launch events can arrive late - TaskAttemptEventType.TA_CONTAINER_LAUNCHED, - TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, - TaskAttemptEventType.TA_CONTAINER_CLEANED, - TaskAttemptEventType.TA_COMMIT_PENDING, - TaskAttemptEventType.TA_DONE, - TaskAttemptEventType.TA_FAILMSG)) + FAILED_KILLED_STATE_IGNORED_EVENTS) // create the topology tables .installTopology(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 1129c2fcfc4..5858136d485 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -550,6 +550,8 @@ public class TestTaskAttempt{ eventHandler.internalError); } + + @Test public void testAppDiognosticEventOnUnassignedTask() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); @@ -599,6 +601,72 @@ public class TestTaskAttempt{ eventHandler.internalError); } + @Test + public void testTooManyFetchFailureAfterKill() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_DONE)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + + assertEquals("Task attempt is not in succeeded state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); + assertEquals("Task attempt is not in KILLED state, still", taImpl.getState(), + TaskAttemptState.KILLED); + assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", + eventHandler.internalError); + } + @Test public void testAppDiognosticEventOnNewTask() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2);