MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM (Tom Graves via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47a29c6329
commit
0a80f82a30
|
@ -544,6 +544,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-3870. Invalid App Metrics
|
MAPREDUCE-3870. Invalid App Metrics
|
||||||
(Bhallamudi Venkata Siva Kamesh via tgraves).
|
(Bhallamudi Venkata Siva Kamesh via tgraves).
|
||||||
|
|
||||||
|
MAPREDUCE-4152. map task left hanging after AM dies trying to connect to RM
|
||||||
|
(Tom Graves via bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -253,6 +253,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
.addTransition(TaskAttemptState.RUNNING,
|
.addTransition(TaskAttemptState.RUNNING,
|
||||||
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
||||||
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
|
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
|
||||||
|
// if container killed by AM shutting down
|
||||||
|
.addTransition(TaskAttemptState.RUNNING,
|
||||||
|
TaskAttemptState.KILLED,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
|
||||||
// Kill handling
|
// Kill handling
|
||||||
.addTransition(TaskAttemptState.RUNNING,
|
.addTransition(TaskAttemptState.RUNNING,
|
||||||
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
||||||
|
@ -272,6 +276,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
.addTransition(TaskAttemptState.COMMIT_PENDING,
|
.addTransition(TaskAttemptState.COMMIT_PENDING,
|
||||||
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
|
||||||
CLEANUP_CONTAINER_TRANSITION)
|
CLEANUP_CONTAINER_TRANSITION)
|
||||||
|
// if container killed by AM shutting down
|
||||||
|
.addTransition(TaskAttemptState.COMMIT_PENDING,
|
||||||
|
TaskAttemptState.KILLED,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
|
||||||
.addTransition(TaskAttemptState.COMMIT_PENDING,
|
.addTransition(TaskAttemptState.COMMIT_PENDING,
|
||||||
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
|
||||||
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
|
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
|
||||||
|
@ -363,6 +371,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptEventType.TA_COMMIT_PENDING,
|
TaskAttemptEventType.TA_COMMIT_PENDING,
|
||||||
TaskAttemptEventType.TA_DONE,
|
TaskAttemptEventType.TA_DONE,
|
||||||
TaskAttemptEventType.TA_FAILMSG,
|
TaskAttemptEventType.TA_FAILMSG,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
||||||
// Container launch events can arrive late
|
// Container launch events can arrive late
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
|
||||||
|
@ -384,6 +393,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptEventType.TA_COMMIT_PENDING,
|
TaskAttemptEventType.TA_COMMIT_PENDING,
|
||||||
TaskAttemptEventType.TA_DONE,
|
TaskAttemptEventType.TA_DONE,
|
||||||
TaskAttemptEventType.TA_FAILMSG,
|
TaskAttemptEventType.TA_FAILMSG,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
||||||
// Container launch events can arrive late
|
// Container launch events can arrive late
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
|
||||||
|
@ -402,6 +412,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptState.SUCCEEDED,
|
TaskAttemptState.SUCCEEDED,
|
||||||
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
EnumSet.of(TaskAttemptEventType.TA_KILL,
|
||||||
TaskAttemptEventType.TA_FAILMSG,
|
TaskAttemptEventType.TA_FAILMSG,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
||||||
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
|
||||||
|
|
||||||
// Transitions from FAILED state
|
// Transitions from FAILED state
|
||||||
|
@ -417,6 +428,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
// Container launch events can arrive late
|
// Container launch events can arrive late
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
||||||
TaskAttemptEventType.TA_COMMIT_PENDING,
|
TaskAttemptEventType.TA_COMMIT_PENDING,
|
||||||
TaskAttemptEventType.TA_DONE,
|
TaskAttemptEventType.TA_DONE,
|
||||||
TaskAttemptEventType.TA_FAILMSG))
|
TaskAttemptEventType.TA_FAILMSG))
|
||||||
|
@ -434,6 +446,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
// Container launch events can arrive late
|
// Container launch events can arrive late
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
|
||||||
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED,
|
||||||
TaskAttemptEventType.TA_COMMIT_PENDING,
|
TaskAttemptEventType.TA_COMMIT_PENDING,
|
||||||
TaskAttemptEventType.TA_DONE,
|
TaskAttemptEventType.TA_DONE,
|
||||||
TaskAttemptEventType.TA_FAILMSG))
|
TaskAttemptEventType.TA_FAILMSG))
|
||||||
|
|
|
@ -82,10 +82,12 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
||||||
YarnRPC rpc;
|
YarnRPC rpc;
|
||||||
|
|
||||||
private Container getContainer(ContainerId id) {
|
private Container getContainer(ContainerLauncherEvent event) {
|
||||||
|
ContainerId id = event.getContainerID();
|
||||||
Container c = containers.get(id);
|
Container c = containers.get(id);
|
||||||
if(c == null) {
|
if(c == null) {
|
||||||
c = new Container();
|
c = new Container(event.getTaskAttemptID(), event.getContainerID(),
|
||||||
|
event.getContainerMgrAddress(), event.getContainerToken());
|
||||||
Container old = containers.putIfAbsent(id, c);
|
Container old = containers.putIfAbsent(id, c);
|
||||||
if(old != null) {
|
if(old != null) {
|
||||||
c = old;
|
c = old;
|
||||||
|
@ -107,9 +109,19 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
|
|
||||||
private class Container {
|
private class Container {
|
||||||
private ContainerState state;
|
private ContainerState state;
|
||||||
|
// store enough information to be able to cleanup the container
|
||||||
|
private TaskAttemptId taskAttemptID;
|
||||||
|
private ContainerId containerID;
|
||||||
|
final private String containerMgrAddress;
|
||||||
|
private ContainerToken containerToken;
|
||||||
|
|
||||||
public Container() {
|
public Container(TaskAttemptId taId, ContainerId containerID,
|
||||||
|
String containerMgrAddress, ContainerToken containerToken) {
|
||||||
this.state = ContainerState.PREP;
|
this.state = ContainerState.PREP;
|
||||||
|
this.taskAttemptID = taId;
|
||||||
|
this.containerMgrAddress = containerMgrAddress;
|
||||||
|
this.containerID = containerID;
|
||||||
|
this.containerToken = containerToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean isCompletelyDone() {
|
public synchronized boolean isCompletelyDone() {
|
||||||
|
@ -118,7 +130,6 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public synchronized void launch(ContainerRemoteLaunchEvent event) {
|
public synchronized void launch(ContainerRemoteLaunchEvent event) {
|
||||||
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
|
||||||
LOG.info("Launching " + taskAttemptID);
|
LOG.info("Launching " + taskAttemptID);
|
||||||
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
|
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
|
||||||
state = ContainerState.DONE;
|
state = ContainerState.DONE;
|
||||||
|
@ -127,15 +138,10 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
final String containerManagerBindAddr = event.getContainerMgrAddress();
|
|
||||||
ContainerId containerID = event.getContainerID();
|
|
||||||
ContainerToken containerToken = event.getContainerToken();
|
|
||||||
|
|
||||||
ContainerManager proxy = null;
|
ContainerManager proxy = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
proxy = getCMProxy(containerID, containerManagerBindAddr,
|
proxy = getCMProxy(containerID, containerMgrAddress,
|
||||||
containerToken);
|
containerToken);
|
||||||
|
|
||||||
// Construct the actual Container
|
// Construct the actual Container
|
||||||
|
@ -181,35 +187,35 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public synchronized void kill(ContainerLauncherEvent event) {
|
public synchronized void kill() {
|
||||||
|
|
||||||
|
if(isCompletelyDone()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if(this.state == ContainerState.PREP) {
|
if(this.state == ContainerState.PREP) {
|
||||||
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
|
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
|
||||||
} else {
|
} else {
|
||||||
final String containerManagerBindAddr = event.getContainerMgrAddress();
|
|
||||||
ContainerId containerID = event.getContainerID();
|
|
||||||
ContainerToken containerToken = event.getContainerToken();
|
|
||||||
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
|
||||||
LOG.info("KILLING " + taskAttemptID);
|
LOG.info("KILLING " + taskAttemptID);
|
||||||
|
|
||||||
ContainerManager proxy = null;
|
ContainerManager proxy = null;
|
||||||
try {
|
try {
|
||||||
proxy = getCMProxy(containerID, containerManagerBindAddr,
|
proxy = getCMProxy(this.containerID, this.containerMgrAddress,
|
||||||
containerToken);
|
this.containerToken);
|
||||||
|
|
||||||
// kill the remote container if already launched
|
// kill the remote container if already launched
|
||||||
StopContainerRequest stopRequest = Records
|
StopContainerRequest stopRequest = Records
|
||||||
.newRecord(StopContainerRequest.class);
|
.newRecord(StopContainerRequest.class);
|
||||||
stopRequest.setContainerId(event.getContainerID());
|
stopRequest.setContainerId(this.containerID);
|
||||||
proxy.stopContainer(stopRequest);
|
proxy.stopContainer(stopRequest);
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
||||||
// ignore the cleanup failure
|
// ignore the cleanup failure
|
||||||
String message = "cleanup failed for container "
|
String message = "cleanup failed for container "
|
||||||
+ event.getContainerID() + " : "
|
+ this.containerID + " : "
|
||||||
+ StringUtils.stringifyException(t);
|
+ StringUtils.stringifyException(t);
|
||||||
context.getEventHandler().handle(
|
context.getEventHandler().handle(
|
||||||
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
|
new TaskAttemptDiagnosticsUpdateEvent(this.taskAttemptID, message));
|
||||||
LOG.warn(message);
|
LOG.warn(message);
|
||||||
} finally {
|
} finally {
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
|
@ -220,10 +226,11 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
// after killing, send killed event to task attempt
|
// after killing, send killed event to task attempt
|
||||||
context.getEventHandler().handle(
|
context.getEventHandler().handle(
|
||||||
new TaskAttemptEvent(event.getTaskAttemptID(),
|
new TaskAttemptEvent(this.taskAttemptID,
|
||||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// To track numNodes.
|
// To track numNodes.
|
||||||
Set<String> allNodes = new HashSet<String>();
|
Set<String> allNodes = new HashSet<String>();
|
||||||
|
|
||||||
|
@ -308,7 +315,17 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void shutdownAllContainers() {
|
||||||
|
for (Container ct : this.containers.values()) {
|
||||||
|
if (ct != null) {
|
||||||
|
ct.kill();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
// shutdown any containers that might be left running
|
||||||
|
shutdownAllContainers();
|
||||||
eventHandlingThread.interrupt();
|
eventHandlingThread.interrupt();
|
||||||
launcherPool.shutdownNow();
|
launcherPool.shutdownNow();
|
||||||
super.stop();
|
super.stop();
|
||||||
|
@ -364,7 +381,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
// TODO: Do it only once per NodeManager.
|
// TODO: Do it only once per NodeManager.
|
||||||
ContainerId containerID = event.getContainerID();
|
ContainerId containerID = event.getContainerID();
|
||||||
|
|
||||||
Container c = getContainer(containerID);
|
Container c = getContainer(event);
|
||||||
switch(event.getType()) {
|
switch(event.getType()) {
|
||||||
|
|
||||||
case CONTAINER_REMOTE_LAUNCH:
|
case CONTAINER_REMOTE_LAUNCH:
|
||||||
|
@ -374,7 +391,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case CONTAINER_REMOTE_CLEANUP:
|
case CONTAINER_REMOTE_CLEANUP:
|
||||||
c.kill(event);
|
c.kill();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
removeContainerIfDone(containerID);
|
removeContainerIfDone(containerID);
|
||||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
@ -450,6 +451,121 @@ public class TestTaskAttempt{
|
||||||
assertFalse(eventHandler.internalError);
|
assertFalse(eventHandler.internalError);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerCleanedWhileRunning() throws Exception {
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
BuilderUtils.newApplicationAttemptId(appId, 0);
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||||
|
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||||
|
Path jobFile = mock(Path.class);
|
||||||
|
|
||||||
|
MockEventHandler eventHandler = new MockEventHandler();
|
||||||
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||||
|
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
|
||||||
|
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||||
|
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||||
|
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||||
|
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
||||||
|
|
||||||
|
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
||||||
|
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
|
||||||
|
|
||||||
|
AppContext appCtx = mock(AppContext.class);
|
||||||
|
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||||
|
Resource resource = mock(Resource.class);
|
||||||
|
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||||
|
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||||
|
when(resource.getMemory()).thenReturn(1024);
|
||||||
|
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
||||||
|
splits, jobConf, taListener,
|
||||||
|
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
|
||||||
|
new SystemClock(), appCtx);
|
||||||
|
|
||||||
|
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
|
||||||
|
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(container.getId()).thenReturn(contId);
|
||||||
|
when(container.getNodeId()).thenReturn(nid);
|
||||||
|
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
|
||||||
|
|
||||||
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||||
|
TaskAttemptEventType.TA_SCHEDULE));
|
||||||
|
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
|
||||||
|
container, mock(Map.class)));
|
||||||
|
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
|
||||||
|
assertEquals("Task attempt is not in running state", taImpl.getState(),
|
||||||
|
TaskAttemptState.RUNNING);
|
||||||
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||||
|
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
||||||
|
eventHandler.internalError);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerCleanedWhileCommitting() throws Exception {
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
BuilderUtils.newApplicationAttemptId(appId, 0);
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||||
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
||||||
|
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
|
||||||
|
Path jobFile = mock(Path.class);
|
||||||
|
|
||||||
|
MockEventHandler eventHandler = new MockEventHandler();
|
||||||
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
||||||
|
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
|
||||||
|
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
|
||||||
|
jobConf.setBoolean("fs.file.impl.disable.cache", true);
|
||||||
|
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
|
||||||
|
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
|
||||||
|
|
||||||
|
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
|
||||||
|
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
|
||||||
|
|
||||||
|
AppContext appCtx = mock(AppContext.class);
|
||||||
|
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||||
|
Resource resource = mock(Resource.class);
|
||||||
|
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||||
|
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||||
|
when(resource.getMemory()).thenReturn(1024);
|
||||||
|
|
||||||
|
TaskAttemptImpl taImpl =
|
||||||
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
||||||
|
splits, jobConf, taListener,
|
||||||
|
mock(OutputCommitter.class), mock(Token.class), new Credentials(),
|
||||||
|
new SystemClock(), appCtx);
|
||||||
|
|
||||||
|
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
|
||||||
|
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(container.getId()).thenReturn(contId);
|
||||||
|
when(container.getNodeId()).thenReturn(nid);
|
||||||
|
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
|
||||||
|
|
||||||
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||||
|
TaskAttemptEventType.TA_SCHEDULE));
|
||||||
|
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
|
||||||
|
container, mock(Map.class)));
|
||||||
|
taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
|
||||||
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||||
|
TaskAttemptEventType.TA_COMMIT_PENDING));
|
||||||
|
|
||||||
|
assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
|
||||||
|
TaskAttemptState.COMMIT_PENDING);
|
||||||
|
taImpl.handle(new TaskAttemptEvent(attemptId,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||||
|
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
|
||||||
|
eventHandler.internalError);
|
||||||
|
}
|
||||||
|
|
||||||
public static class MockEventHandler implements EventHandler {
|
public static class MockEventHandler implements EventHandler {
|
||||||
public boolean internalError;
|
public boolean internalError;
|
||||||
|
|
||||||
|
|
|
@ -220,4 +220,58 @@ public class TestContainerLauncherImpl {
|
||||||
ut.stop();
|
ut.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMyShutdown() throws Exception {
|
||||||
|
LOG.info("in test Shutdown");
|
||||||
|
|
||||||
|
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||||
|
AppContext mockContext = mock(AppContext.class);
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||||
|
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||||
|
|
||||||
|
ContainerManager mockCM = mock(ContainerManager.class);
|
||||||
|
when(mockRpc.getProxy(eq(ContainerManager.class),
|
||||||
|
any(InetSocketAddress.class), any(Configuration.class)))
|
||||||
|
.thenReturn(mockCM);
|
||||||
|
|
||||||
|
ContainerLauncherImplUnderTest ut =
|
||||||
|
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
ut.init(conf);
|
||||||
|
ut.start();
|
||||||
|
try {
|
||||||
|
ContainerId contId = makeContainerId(0l, 0, 0, 1);
|
||||||
|
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
|
||||||
|
String cmAddress = "127.0.0.1:8000";
|
||||||
|
StartContainerResponse startResp =
|
||||||
|
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||||
|
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||||
|
ShuffleHandler.serializeMetaData(80));
|
||||||
|
|
||||||
|
LOG.info("inserting launch event");
|
||||||
|
ContainerRemoteLaunchEvent mockLaunchEvent =
|
||||||
|
mock(ContainerRemoteLaunchEvent.class);
|
||||||
|
when(mockLaunchEvent.getType())
|
||||||
|
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
|
||||||
|
when(mockLaunchEvent.getContainerID())
|
||||||
|
.thenReturn(contId);
|
||||||
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
||||||
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
||||||
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
||||||
|
ut.handle(mockLaunchEvent);
|
||||||
|
|
||||||
|
ut.waitForPoolToIdle();
|
||||||
|
|
||||||
|
verify(mockCM).startContainer(any(StartContainerRequest.class));
|
||||||
|
|
||||||
|
// skip cleanup and make sure stop kills the container
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
ut.stop();
|
||||||
|
verify(mockCM).stopContainer(any(StopContainerRequest.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue