diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index d09b3cb1e56..d184d9be64b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -379,27 +379,38 @@ class EventProcessor implements Runnable { @Override public void run() { - LOG.info("Processing the event " + event.toString()); + LOG.info("Processing the event {}", event); // Load ContainerManager tokens before creating a connection. // TODO: Do it only once per NodeManager. ContainerId containerID = event.getContainerID(); - Container c = getContainer(event); switch(event.getType()) { case CONTAINER_REMOTE_LAUNCH: ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event; - c.launch(launchEvent); + getContainer(event).launch(launchEvent); break; case CONTAINER_REMOTE_CLEANUP: - c.kill(event.getDumpContainerThreads()); + // If the container failed to launch earlier (due to dead node for example), + // it has been marked as FAILED and removed from containers during + // CONTAINER_REMOTE_LAUNCH event handling. + // Skip kill() such container during CONTAINER_REMOTE_CLEANUP as + // it is not necessary and could cost 15 minutes delay if the node is dead. + if (!containers.containsKey(containerID)) { + LOG.info("Skip cleanup of already-removed container {}", containerID); + // send killed event to task attempt regardless like in kill(). + context.getEventHandler().handle(new TaskAttemptEvent(event.getTaskAttemptID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + return; + } + getContainer(event).kill(event.getDumpContainerThreads()); break; case CONTAINER_COMPLETED: - c.done(); + getContainer(event).done(); break; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 2057cc80ff0..88ba8943ceb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -209,14 +209,11 @@ public void testHandle() throws Exception { ut.waitForPoolToIdle(); verify(mockCM).startContainers(any(StartContainersRequest.class)); - + LOG.info("inserting cleanup event"); - ContainerLauncherEvent mockCleanupEvent = - mock(ContainerLauncherEvent.class); - when(mockCleanupEvent.getType()) - .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); - when(mockCleanupEvent.getContainerID()) - .thenReturn(contId); + ContainerLauncherEvent mockCleanupEvent = mock(ContainerLauncherEvent.class); + when(mockCleanupEvent.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); + when(mockCleanupEvent.getContainerID()).thenReturn(contId); when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId); when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress); ut.handle(mockCleanupEvent); @@ -283,8 +280,21 @@ public void testOutOfOrder() throws Exception { ut.handle(mockLaunchEvent); ut.waitForPoolToIdle(); - - verify(mockCM, never()).startContainers(any(StartContainersRequest.class)); + + verify(mockCM).startContainers(any(StartContainersRequest.class)); + + LOG.info("inserting cleanup event"); + ContainerLauncherEvent mockCleanupEvent2 = mock(ContainerLauncherEvent.class); + when(mockCleanupEvent2.getType()).thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); + when(mockCleanupEvent2.getContainerID()).thenReturn(contId); + when(mockCleanupEvent2.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockCleanupEvent2.getContainerMgrAddress()).thenReturn(cmAddress); + ut.handle(mockCleanupEvent2); + + ut.waitForPoolToIdle(); + + // Verifies stopContainers is called on existing container + verify(mockCM).stopContainers(any(StopContainersRequest.class)); } finally { ut.stop(); }