svn merge -c 1344283. FIXES: MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM (Tom Graves via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1344288 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-05-30 14:52:39 +00:00
parent 48a439ea7e
commit ed20ba2e49
5 changed files with 226 additions and 23 deletions

View File

@ -428,6 +428,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3870. Invalid App Metrics
(Bhallamudi Venkata Siva Kamesh via tgraves).
MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM
(Tom Graves via bobby)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -253,6 +253,10 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptState.RUNNING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
.addTransition(TaskAttemptState.RUNNING,
TaskAttemptState.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling
.addTransition(TaskAttemptState.RUNNING,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
@ -272,6 +276,10 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptState.COMMIT_PENDING,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
.addTransition(TaskAttemptState.COMMIT_PENDING,
TaskAttemptState.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptState.COMMIT_PENDING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
@ -363,6 +371,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@ -384,6 +393,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@ -402,6 +412,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptState.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAILED state
@ -417,6 +428,7 @@ public abstract class TaskAttemptImpl implements
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
@ -434,6 +446,7 @@ public abstract class TaskAttemptImpl implements
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))

View File

@ -82,10 +82,12 @@ public class ContainerLauncherImpl extends AbstractService implements
new LinkedBlockingQueue<ContainerLauncherEvent>();
YarnRPC rpc;
private Container getContainer(ContainerId id) {
private Container getContainer(ContainerLauncherEvent event) {
ContainerId id = event.getContainerID();
Container c = containers.get(id);
if(c == null) {
c = new Container();
c = new Container(event.getTaskAttemptID(), event.getContainerID(),
event.getContainerMgrAddress(), event.getContainerToken());
Container old = containers.putIfAbsent(id, c);
if(old != null) {
c = old;
@ -107,9 +109,19 @@ public class ContainerLauncherImpl extends AbstractService implements
private class Container {
private ContainerState state;
// store enough information to be able to cleanup the container
private TaskAttemptId taskAttemptID;
private ContainerId containerID;
final private String containerMgrAddress;
private ContainerToken containerToken;
public Container() {
public Container(TaskAttemptId taId, ContainerId containerID,
String containerMgrAddress, ContainerToken containerToken) {
this.state = ContainerState.PREP;
this.taskAttemptID = taId;
this.containerMgrAddress = containerMgrAddress;
this.containerID = containerID;
this.containerToken = containerToken;
}
public synchronized boolean isCompletelyDone() {
@ -118,7 +130,6 @@ public class ContainerLauncherImpl extends AbstractService implements
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("Launching " + taskAttemptID);
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
@ -127,15 +138,10 @@ public class ContainerLauncherImpl extends AbstractService implements
return;
}
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
ContainerManager proxy = null;
try {
proxy = getCMProxy(containerID, containerManagerBindAddr,
proxy = getCMProxy(containerID, containerMgrAddress,
containerToken);
// Construct the actual Container
@ -181,35 +187,35 @@ public class ContainerLauncherImpl extends AbstractService implements
}
@SuppressWarnings("unchecked")
public synchronized void kill(ContainerLauncherEvent event) {
public synchronized void kill() {
if(isCompletelyDone()) {
return;
}
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("KILLING " + taskAttemptID);
ContainerManager proxy = null;
try {
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
proxy = getCMProxy(this.containerID, this.containerMgrAddress,
this.containerToken);
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID());
stopRequest.setContainerId(this.containerID);
proxy.stopContainer(stopRequest);
} catch (Throwable t) {
// ignore the cleanup failure
String message = "cleanup failed for container "
+ event.getContainerID() + " : "
+ this.containerID + " : "
+ StringUtils.stringifyException(t);
context.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message));
LOG.warn(message);
} finally {
if (proxy != null) {
@ -220,10 +226,11 @@ public class ContainerLauncherImpl extends AbstractService implements
}
// after killing, send killed event to task attempt
context.getEventHandler().handle(
new TaskAttemptEvent(event.getTaskAttemptID(),
new TaskAttemptEvent(this.taskAttemptID,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
}
}
// To track numNodes.
Set<String> allNodes = new HashSet<String>();
@ -308,7 +315,17 @@ public class ContainerLauncherImpl extends AbstractService implements
super.start();
}
private void shutdownAllContainers() {
for (Container ct : this.containers.values()) {
if (ct != null) {
ct.kill();
}
}
}
public void stop() {
// shutdown any containers that might be left running
shutdownAllContainers();
eventHandlingThread.interrupt();
launcherPool.shutdownNow();
super.stop();
@ -364,7 +381,7 @@ public class ContainerLauncherImpl extends AbstractService implements
// TODO: Do it only once per NodeManager.
ContainerId containerID = event.getContainerID();
Container c = getContainer(containerID);
Container c = getContainer(event);
switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
@ -374,7 +391,7 @@ public class ContainerLauncherImpl extends AbstractService implements
break;
case CONTAINER_REMOTE_CLEANUP:
c.kill(event);
c.kill();
break;
}
removeContainerIfDone(containerID);

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -450,6 +451,121 @@ public class TestTaskAttempt{
assertFalse(eventHandler.internalError);
}
@Test
public void testContainerCleanedWhileRunning() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
container, mock(Map.class)));
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
assertEquals("Task attempt is not in running state", taImpl.getState(),
TaskAttemptState.RUNNING);
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
eventHandler.internalError);
}
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
container, mock(Map.class)));
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_COMMIT_PENDING));
assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
TaskAttemptState.COMMIT_PENDING);
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
eventHandler.internalError);
}
public static class MockEventHandler implements EventHandler {
public boolean internalError;

View File

@ -220,4 +220,58 @@ public class TestContainerLauncherImpl {
ut.stop();
}
}
@Test
public void testMyShutdown() throws Exception {
LOG.info("in test Shutdown");
YarnRPC mockRpc = mock(YarnRPC.class);
AppContext mockContext = mock(AppContext.class);
@SuppressWarnings("rawtypes")
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManager mockCM = mock(ContainerManager.class);
when(mockRpc.getProxy(eq(ContainerManager.class),
any(InetSocketAddress.class), any(Configuration.class)))
.thenReturn(mockCM);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
verify(mockCM).startContainer(any(StartContainerRequest.class));
// skip cleanup and make sure stop kills the container
} finally {
ut.stop();
verify(mockCM).stopContainer(any(StopContainerRequest.class));
}
}
}