svn merge -c 1425167 FIXES: MAPREDUCE-4833. Task can get stuck in FAIL_CONTAINER_CLEANUP. Contributed by Robert Parker

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1425168 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-12-21 22:34:41 +00:00
parent c28831a0bd
commit 043fd00a30
3 changed files with 159 additions and 5 deletions

View File

@ -475,6 +475,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4842. Shuffle race can hang reducer (Mariappan Asokan via jlowe)
MAPREDUCE-4833. Task can get stuck in FAIL_CONTAINER_CLEANUP (Robert
Parker via jlowe)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -191,12 +191,9 @@ public class ContainerLauncherImpl extends AbstractService implements
@SuppressWarnings("unchecked")
public synchronized void kill() {
if(isCompletelyDone()) {
return;
}
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
} else if (!isCompletelyDone()) {
LOG.info("KILLING " + taskAttemptID);
ContainerManager proxy = null;

View File

@ -6,8 +6,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.atLeast;
import org.mockito.ArgumentCaptor;
import java.net.InetSocketAddress;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -18,15 +22,21 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -272,6 +282,150 @@ public class TestContainerLauncherImpl {
} finally {
ut.stop();
verify(mockCM).stopContainer(any(StopContainerRequest.class));
}
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testContainerCleaned() throws Exception {
LOG.info("STARTING testContainerCleaned");
CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
YarnRPC mockRpc = mock(YarnRPC.class);
AppContext mockContext = mock(AppContext.class);
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManager mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
when(mockRpc.getProxy(eq(ContainerManager.class),
any(InetSocketAddress.class), any(Configuration.class)))
.thenReturn(mockCM);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockLaunchEvent);
startLaunchBarrier.await();
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
completeLaunchBarrier.await();
ut.waitForPoolToIdle();
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler, atLeast(2)).handle(arg.capture());
boolean containerCleaned = false;
for (int i =0; i < arg.getAllValues().size(); i++) {
LOG.info(arg.getAllValues().get(i).toString());
Event currentEvent = arg.getAllValues().get(i);
if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
containerCleaned = true;
}
}
assert(containerCleaned);
} finally {
ut.stop();
}
}
private static class ContainerManagerForTest implements ContainerManager {
private CyclicBarrier startLaunchBarrier;
private CyclicBarrier completeLaunchBarrier;
ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) {
this.startLaunchBarrier = startLaunchBarrier;
this.completeLaunchBarrier = completeLaunchBarrier;
}
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
try {
startLaunchBarrier.await();
completeLaunchBarrier.await();
//To ensure the kill is started before the launch
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
throw new ContainerException("Force fail CM");
}
@Override
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
return null;
}
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
return null;
}
}
@SuppressWarnings("serial")
private static class ContainerException extends YarnRemoteException {
public ContainerException(String message) {
super(message);
}
@Override
public String getRemoteTrace() {
return null;
}
@Override
public YarnRemoteException getCause() {
return null;
}
}
}