diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f78c4c420cd..4544eb5b58e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -632,6 +632,9 @@ Release 0.23.6 - UNRELEASED MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe) + MAPREDUCE-4833. Task can get stuck in FAIL_CONTAINER_CLEANUP (Robert + Parker via jlowe) + Release 0.23.5 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index fa97d692ee3..058f7e5da8b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -191,12 +191,9 @@ public class ContainerLauncherImpl extends AbstractService implements @SuppressWarnings("unchecked") public synchronized void kill() { - if(isCompletelyDone()) { - return; - } if(this.state == ContainerState.PREP) { this.state = ContainerState.KILLED_BEFORE_LAUNCH; - } else { + } else if (!isCompletelyDone()) { LOG.info("KILLING " + taskAttemptID); ContainerManager proxy = null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java index 838daea0872..a53bbe69072 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java @@ -6,8 +6,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.atLeast; +import org.mockito.ArgumentCaptor; import java.net.InetSocketAddress; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -18,15 +22,21 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManager; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -272,6 +282,150 @@ public class TestContainerLauncherImpl { } finally { ut.stop(); verify(mockCM).stopContainer(any(StopContainerRequest.class)); -} + } } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testContainerCleaned() throws Exception { + LOG.info("STARTING testContainerCleaned"); + + CyclicBarrier startLaunchBarrier = new CyclicBarrier(2); + CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2); + + YarnRPC mockRpc = mock(YarnRPC.class); + AppContext mockContext = mock(AppContext.class); + + EventHandler mockEventHandler = mock(EventHandler.class); + when(mockContext.getEventHandler()).thenReturn(mockEventHandler); + + ContainerManager mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier); + when(mockRpc.getProxy(eq(ContainerManager.class), + any(InetSocketAddress.class), any(Configuration.class))) + .thenReturn(mockCM); + + ContainerLauncherImplUnderTest ut = + new ContainerLauncherImplUnderTest(mockContext, mockRpc); + + Configuration conf = new Configuration(); + ut.init(conf); + ut.start(); + try { + ContainerId contId = makeContainerId(0l, 0, 0, 1); + TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); + String cmAddress = "127.0.0.1:8000"; + StartContainerResponse startResp = + recordFactory.newRecordInstance(StartContainerResponse.class); + startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeMetaData(80)); + + + LOG.info("inserting launch event"); + ContainerRemoteLaunchEvent mockLaunchEvent = + mock(ContainerRemoteLaunchEvent.class); + when(mockLaunchEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH); + when(mockLaunchEvent.getContainerID()) + .thenReturn(contId); + when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); + ut.handle(mockLaunchEvent); + + startLaunchBarrier.await(); + + + LOG.info("inserting cleanup event"); + ContainerLauncherEvent mockCleanupEvent = + mock(ContainerLauncherEvent.class); + when(mockCleanupEvent.getType()) + .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP); + when(mockCleanupEvent.getContainerID()) + .thenReturn(contId); + when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId); + when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress); + ut.handle(mockCleanupEvent); + + completeLaunchBarrier.await(); + + ut.waitForPoolToIdle(); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, atLeast(2)).handle(arg.capture()); + boolean containerCleaned = false; + + for (int i =0; i < arg.getAllValues().size(); i++) { + LOG.info(arg.getAllValues().get(i).toString()); + Event currentEvent = arg.getAllValues().get(i); + if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) { + containerCleaned = true; + } + } + assert(containerCleaned); + + } finally { + ut.stop(); + } + } + + private static class ContainerManagerForTest implements ContainerManager { + + private CyclicBarrier startLaunchBarrier; + private CyclicBarrier completeLaunchBarrier; + + ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) { + this.startLaunchBarrier = startLaunchBarrier; + this.completeLaunchBarrier = completeLaunchBarrier; + } + @Override + public StartContainerResponse startContainer(StartContainerRequest request) + throws YarnRemoteException { + try { + startLaunchBarrier.await(); + completeLaunchBarrier.await(); + //To ensure the kill is started before the launch + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + + throw new ContainerException("Force fail CM"); + + } + + @Override + public StopContainerResponse stopContainer(StopContainerRequest request) + throws YarnRemoteException { + + return null; + } + + @Override + public GetContainerStatusResponse getContainerStatus( + GetContainerStatusRequest request) throws YarnRemoteException { + + return null; + } + } + + @SuppressWarnings("serial") + private static class ContainerException extends YarnRemoteException { + + public ContainerException(String message) { + super(message); + } + + @Override + public String getRemoteTrace() { + return null; + } + + @Override + public YarnRemoteException getCause() { + return null; + } + + } + }