From ed20ba2e498bc732a8e4adb4c40fdf1c4671dbd7 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Wed, 30 May 2012 14:52:39 +0000 Subject: [PATCH] svn merge -c 1344283. FIXES: MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM (Tom Graves via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1344288 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 13 ++ .../app/launcher/ContainerLauncherImpl.java | 63 ++++++---- .../v2/app/job/impl/TestTaskAttempt.java | 116 ++++++++++++++++++ .../launcher/TestContainerLauncherImpl.java | 54 ++++++++ 5 files changed, 226 insertions(+), 23 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dae19eea528..91bfb3fc09f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -428,6 +428,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-3870. Invalid App Metrics (Bhallamudi Venkata Siva Kamesh via tgraves). + MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM + (Tom Graves via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 7ac334c8ffc..cafff92920e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -253,6 +253,10 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) + // if container killed by AM shutting down + .addTransition(TaskAttemptState.RUNNING, + TaskAttemptState.KILLED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) // Kill handling .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, @@ -272,6 +276,10 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) + // if container killed by AM shutting down + .addTransition(TaskAttemptState.COMMIT_PENDING, + TaskAttemptState.KILLED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) .addTransition(TaskAttemptState.COMMIT_PENDING, TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) @@ -363,6 +371,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_CONTAINER_CLEANED, // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) @@ -384,6 +393,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_CONTAINER_CLEANED, // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) @@ -402,6 +412,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptState.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_FAILMSG, + TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_COMPLETED)) // Transitions from FAILED state @@ -417,6 +428,7 @@ public abstract class TaskAttemptImpl implements // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG)) @@ -434,6 +446,7 @@ public abstract class TaskAttemptImpl implements // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, + TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG)) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 44dd16daa05..3144ab179c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -82,10 +82,12 @@ public class ContainerLauncherImpl extends AbstractService implements new LinkedBlockingQueue(); YarnRPC rpc; - private Container getContainer(ContainerId id) { + private Container getContainer(ContainerLauncherEvent event) { + ContainerId id = event.getContainerID(); Container c = containers.get(id); if(c == null) { - c = new Container(); + c = new Container(event.getTaskAttemptID(), event.getContainerID(), + event.getContainerMgrAddress(), event.getContainerToken()); Container old = containers.putIfAbsent(id, c); if(old != null) { c = old; @@ -107,9 +109,19 @@ public class ContainerLauncherImpl extends AbstractService implements private class Container { private ContainerState state; + // store enough information to be able to cleanup the container + private TaskAttemptId taskAttemptID; + private ContainerId containerID; + final private String containerMgrAddress; + private ContainerToken containerToken; - public Container() { + public Container(TaskAttemptId taId, ContainerId containerID, + String containerMgrAddress, ContainerToken containerToken) { this.state = ContainerState.PREP; + this.taskAttemptID = taId; + this.containerMgrAddress = containerMgrAddress; + this.containerID = containerID; + this.containerToken = containerToken; } public synchronized boolean isCompletelyDone() { @@ -118,7 +130,6 @@ public class ContainerLauncherImpl extends AbstractService implements @SuppressWarnings("unchecked") public synchronized void launch(ContainerRemoteLaunchEvent event) { - TaskAttemptId taskAttemptID = event.getTaskAttemptID(); LOG.info("Launching " + taskAttemptID); if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { state = ContainerState.DONE; @@ -127,15 +138,10 @@ public class ContainerLauncherImpl extends AbstractService implements return; } - - final String containerManagerBindAddr = event.getContainerMgrAddress(); - ContainerId containerID = event.getContainerID(); - ContainerToken containerToken = event.getContainerToken(); - ContainerManager proxy = null; try { - proxy = getCMProxy(containerID, containerManagerBindAddr, + proxy = getCMProxy(containerID, containerMgrAddress, containerToken); // Construct the actual Container @@ -181,35 +187,35 @@ public class ContainerLauncherImpl extends AbstractService implements } @SuppressWarnings("unchecked") - public synchronized void kill(ContainerLauncherEvent event) { + public synchronized void kill() { + + if(isCompletelyDone()) { + return; + } if(this.state == ContainerState.PREP) { this.state = ContainerState.KILLED_BEFORE_LAUNCH; } else { - final String containerManagerBindAddr = event.getContainerMgrAddress(); - ContainerId containerID = event.getContainerID(); - ContainerToken containerToken = event.getContainerToken(); - TaskAttemptId taskAttemptID = event.getTaskAttemptID(); LOG.info("KILLING " + taskAttemptID); ContainerManager proxy = null; try { - proxy = getCMProxy(containerID, containerManagerBindAddr, - containerToken); + proxy = getCMProxy(this.containerID, this.containerMgrAddress, + this.containerToken); // kill the remote container if already launched StopContainerRequest stopRequest = Records .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(event.getContainerID()); + stopRequest.setContainerId(this.containerID); proxy.stopContainer(stopRequest); } catch (Throwable t) { // ignore the cleanup failure String message = "cleanup failed for container " - + event.getContainerID() + " : " + + this.containerID + " : " + StringUtils.stringifyException(t); context.getEventHandler().handle( - new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); + new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message)); LOG.warn(message); } finally { if (proxy != null) { @@ -220,10 +226,11 @@ public class ContainerLauncherImpl extends AbstractService implements } // after killing, send killed event to task attempt context.getEventHandler().handle( - new TaskAttemptEvent(event.getTaskAttemptID(), + new TaskAttemptEvent(this.taskAttemptID, TaskAttemptEventType.TA_CONTAINER_CLEANED)); } } + // To track numNodes. Set allNodes = new HashSet(); @@ -308,7 +315,17 @@ public class ContainerLauncherImpl extends AbstractService implements super.start(); } + private void shutdownAllContainers() { + for (Container ct : this.containers.values()) { + if (ct != null) { + ct.kill(); + } + } + } + public void stop() { + // shutdown any containers that might be left running + shutdownAllContainers(); eventHandlingThread.interrupt(); launcherPool.shutdownNow(); super.stop(); @@ -364,7 +381,7 @@ public class ContainerLauncherImpl extends AbstractService implements // TODO: Do it only once per NodeManager. ContainerId containerID = event.getContainerID(); - Container c = getContainer(containerID); + Container c = getContainer(event); switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: @@ -374,7 +391,7 @@ public class ContainerLauncherImpl extends AbstractService implements break; case CONTAINER_REMOTE_CLEANUP: - c.kill(event); + c.kill(); break; } removeContainerIfDone(containerID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index e5ad3fd8226..94c4f20bac1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -450,6 +451,121 @@ public class TestTaskAttempt{ assertFalse(eventHandler.internalError); } + @Test + public void testContainerCleanedWhileRunning() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(OutputCommitter.class), mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", + eventHandler.internalError); + } + + @Test + public void testContainerCleanedWhileCommitting() throws Exception { + ApplicationId appId = BuilderUtils.newApplicationId(1, 2); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"}); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(clusterInfo.getMinContainerCapability()).thenReturn(resource); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + splits, jobConf, taListener, + mock(OutputCommitter.class), mock(Token.class), new Credentials(), + new SystemClock(), appCtx); + + NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0); + ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, + container, mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_COMMIT_PENDING)); + + assertEquals("Task attempt is not in commit pending state", taImpl.getState(), + TaskAttemptState.COMMIT_PENDING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED", + eventHandler.internalError); + } + public static class MockEventHandler implements EventHandler { public boolean internalError; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 98e11a87e6e..838daea0872 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -220,4 +220,58 @@ public class TestContainerLauncherImpl { ut.stop(); } } + + @Test + public void testMyShutdown() throws Exception { + LOG.info("in test Shutdown"); + + YarnRPC mockRpc = mock(YarnRPC.class); + AppContext mockContext = mock(AppContext.class); + @SuppressWarnings("rawtypes") + EventHandler mockEventHandler = mock(EventHandler.class); + when(mockContext.getEventHandler()).thenReturn(mockEventHandler); + + ContainerManager mockCM = mock(ContainerManager.class); + when(mockRpc.getProxy(eq(ContainerManager.class), + any(InetSocketAddress.class), any(Configuration.class))) + .thenReturn(mockCM); + + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockRpc); + + Configuration conf = new Configuration(); + ut.init(conf); + ut.start(); + try { + ContainerId contId = makeContainerId(0l, 0, 0, 1); + TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); + String cmAddress = "127.0.0.1:8000"; + StartContainerResponse startResp = + recordFactory.newRecordInstance(StartContainerResponse.class); + startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeMetaData(80)); + + LOG.info("inserting launch event"); + ContainerRemoteLaunchEvent mockLaunchEvent = + mock(ContainerRemoteLaunchEvent.class); + when(mockLaunchEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH); + when(mockLaunchEvent.getContainerID()) + .thenReturn(contId); + when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); + when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp); + ut.handle(mockLaunchEvent); + + ut.waitForPoolToIdle(); + + verify(mockCM).startContainer(any(StartContainerRequest.class)); + + // skip cleanup and make sure stop kills the container + + } finally { + ut.stop(); + verify(mockCM).stopContainer(any(StopContainerRequest.class)); +} + } }