MAPREDUCE-3312. Modified MR AM to not send a stop-container request for a container that isn't launched at all. Contributed by Robert Joseph Evans.
svn merge --ignore-ancestry -c 1229451 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
048ac6a4a3
commit
4175e70020
|
@ -99,6 +99,9 @@ Release 0.23.1 - Unreleased
|
|||
for the thread loop interval separate from task-timeout configuration
|
||||
property. (Siddharth Seth via vinodkv)
|
||||
|
||||
MAPREDUCE-3312. Modified MR AM to not send a stop-container request for
|
||||
a container that isn't launched at all. (Robert Joseph Evans via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
|||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -44,8 +45,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -75,16 +74,217 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
|
||||
int nmTimeOut;
|
||||
|
||||
private ConcurrentHashMap<ContainerId, Container> containers =
|
||||
new ConcurrentHashMap<ContainerId, Container>();
|
||||
private AppContext context;
|
||||
protected ThreadPoolExecutor launcherPool;
|
||||
protected static final int INITIAL_POOL_SIZE = 10;
|
||||
private int limitOnPoolSize;
|
||||
private Thread eventHandlingThread;
|
||||
private BlockingQueue<ContainerLauncherEvent> eventQueue =
|
||||
protected BlockingQueue<ContainerLauncherEvent> eventQueue =
|
||||
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
||||
final Timer commandTimer = new Timer(true);
|
||||
YarnRPC rpc;
|
||||
|
||||
private Container getContainer(ContainerId id) {
|
||||
Container c = containers.get(id);
|
||||
if(c == null) {
|
||||
c = new Container();
|
||||
Container old = containers.putIfAbsent(id, c);
|
||||
if(old != null) {
|
||||
c = old;
|
||||
}
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
private void removeContainerIfDone(ContainerId id) {
|
||||
Container c = containers.get(id);
|
||||
if(c != null && c.isCompletelyDone()) {
|
||||
containers.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
private static enum ContainerState {
|
||||
PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
|
||||
}
|
||||
|
||||
private class Container {
|
||||
private ContainerState state;
|
||||
|
||||
public Container() {
|
||||
this.state = ContainerState.PREP;
|
||||
}
|
||||
|
||||
public synchronized boolean isCompletelyDone() {
|
||||
return state == ContainerState.DONE || state == ContainerState.FAILED;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void launch(ContainerRemoteLaunchEvent event) {
|
||||
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
||||
LOG.info("Launching " + taskAttemptID);
|
||||
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
|
||||
state = ContainerState.DONE;
|
||||
sendContainerLaunchFailedMsg(taskAttemptID,
|
||||
"Container was killed before it was launched");
|
||||
return;
|
||||
}
|
||||
CommandTimerTask timerTask = new CommandTimerTask(Thread
|
||||
.currentThread(), event);
|
||||
|
||||
final String containerManagerBindAddr = event.getContainerMgrAddress();
|
||||
ContainerId containerID = event.getContainerID();
|
||||
ContainerToken containerToken = event.getContainerToken();
|
||||
|
||||
ContainerManager proxy = null;
|
||||
try {
|
||||
commandTimer.schedule(timerTask, nmTimeOut);
|
||||
|
||||
proxy = getCMProxy(containerID, containerManagerBindAddr,
|
||||
containerToken);
|
||||
|
||||
// Interrupted during getProxy, but that didn't throw exception
|
||||
if (Thread.interrupted()) {
|
||||
// The timer canceled the command in the mean while.
|
||||
String message = "Container launch failed for " + containerID
|
||||
+ " : Start-container for " + event.getContainerID()
|
||||
+ " got interrupted. Returning.";
|
||||
this.state = ContainerState.FAILED;
|
||||
sendContainerLaunchFailedMsg(taskAttemptID, message);
|
||||
return;
|
||||
}
|
||||
// Construct the actual Container
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
event.getContainer();
|
||||
|
||||
// Now launch the actual container
|
||||
StartContainerRequest startRequest = Records
|
||||
.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
StartContainerResponse response = proxy.startContainer(startRequest);
|
||||
|
||||
// container started properly. Stop the timer
|
||||
timerTask.cancel();
|
||||
if (Thread.interrupted()) {
|
||||
// The timer canceled the command in the mean while, but
|
||||
// startContainer didn't throw exception
|
||||
String message = "Container launch failed for " + containerID
|
||||
+ " : Start-container for " + event.getContainerID()
|
||||
+ " got interrupted. Returning.";
|
||||
this.state = ContainerState.FAILED;
|
||||
sendContainerLaunchFailedMsg(taskAttemptID, message);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer portInfo = response
|
||||
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
|
||||
int port = -1;
|
||||
if(portInfo != null) {
|
||||
port = ShuffleHandler.deserializeMetaData(portInfo);
|
||||
}
|
||||
LOG.info("Shuffle port returned by ContainerManager for "
|
||||
+ taskAttemptID + " : " + port);
|
||||
|
||||
if(port < 0) {
|
||||
this.state = ContainerState.FAILED;
|
||||
throw new IllegalStateException("Invalid shuffle port number "
|
||||
+ port + " returned for " + taskAttemptID);
|
||||
}
|
||||
|
||||
// after launching, send launched event to task attempt to move
|
||||
// it from ASSIGNED to RUNNING state
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
|
||||
this.state = ContainerState.RUNNING;
|
||||
} catch (Throwable t) {
|
||||
if (Thread.interrupted()) {
|
||||
// The timer canceled the command in the mean while.
|
||||
LOG.info("Start-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
}
|
||||
String message = "Container launch failed for " + containerID + " : "
|
||||
+ StringUtils.stringifyException(t);
|
||||
this.state = ContainerState.FAILED;
|
||||
sendContainerLaunchFailedMsg(taskAttemptID, message);
|
||||
} finally {
|
||||
timerTask.cancel();
|
||||
if (proxy != null) {
|
||||
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized void kill(ContainerLauncherEvent event) {
|
||||
if(this.state == ContainerState.PREP) {
|
||||
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
|
||||
} else {
|
||||
CommandTimerTask timerTask = new CommandTimerTask(Thread
|
||||
.currentThread(), event);
|
||||
|
||||
final String containerManagerBindAddr = event.getContainerMgrAddress();
|
||||
ContainerId containerID = event.getContainerID();
|
||||
ContainerToken containerToken = event.getContainerToken();
|
||||
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
||||
LOG.info("KILLING " + taskAttemptID);
|
||||
commandTimer.schedule(timerTask, nmTimeOut);
|
||||
|
||||
ContainerManager proxy = null;
|
||||
try {
|
||||
proxy = getCMProxy(containerID, containerManagerBindAddr,
|
||||
containerToken);
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
// The timer canceled the command in the mean while. No need to
|
||||
// return, send cleaned up event anyways.
|
||||
LOG.info("Stop-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
} else {
|
||||
// kill the remote container if already launched
|
||||
StopContainerRequest stopRequest = Records
|
||||
.newRecord(StopContainerRequest.class);
|
||||
stopRequest.setContainerId(event.getContainerID());
|
||||
proxy.stopContainer(stopRequest);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
// The timer canceled the command in the mean while, clear the
|
||||
// interrupt flag
|
||||
LOG.info("Stop-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
}
|
||||
|
||||
// ignore the cleanup failure
|
||||
String message = "cleanup failed for container "
|
||||
+ event.getContainerID() + " : "
|
||||
+ StringUtils.stringifyException(t);
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
|
||||
LOG.warn(message);
|
||||
} finally {
|
||||
timerTask.cancel();
|
||||
if (Thread.interrupted()) {
|
||||
LOG.info("Stop-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
// ignore the cleanup failure
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
|
||||
"cleanup failed for container " + event.getContainerID()));
|
||||
}
|
||||
if (proxy != null) {
|
||||
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
|
||||
}
|
||||
}
|
||||
this.state = ContainerState.DONE;
|
||||
}
|
||||
// after killing, send killed event to task attempt
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptEvent(event.getTaskAttemptID(),
|
||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||
}
|
||||
}
|
||||
// To track numNodes.
|
||||
Set<String> allNodes = new HashSet<String>();
|
||||
|
||||
|
@ -105,10 +305,14 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
|
||||
this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
|
||||
ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
|
||||
this.rpc = YarnRPC.create(conf);
|
||||
this.rpc = createYarnRPC(conf);
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
protected YarnRPC createYarnRPC(Configuration conf) {
|
||||
return YarnRPC.create(conf);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
||||
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
|
||||
|
@ -119,7 +323,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
||||
new LinkedBlockingQueue<Runnable>(),
|
||||
tf);
|
||||
eventHandlingThread = new Thread(new Runnable() {
|
||||
eventHandlingThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
ContainerLauncherEvent event = null;
|
||||
|
@ -162,7 +366,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
// NodeManager into a single connection
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
eventHandlingThread.setName("ContainerLauncher Event Handler");
|
||||
eventHandlingThread.start();
|
||||
super.start();
|
||||
|
@ -255,175 +459,28 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||
this.event = event;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("Processing the event " + event.toString());
|
||||
|
||||
// Load ContainerManager tokens before creating a connection.
|
||||
// TODO: Do it only once per NodeManager.
|
||||
final String containerManagerBindAddr = event.getContainerMgrAddress();
|
||||
ContainerId containerID = event.getContainerID();
|
||||
ContainerToken containerToken = event.getContainerToken();
|
||||
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
|
||||
|
||||
ContainerManager proxy = null;
|
||||
|
||||
CommandTimerTask timerTask = new CommandTimerTask(Thread
|
||||
.currentThread(), event);
|
||||
|
||||
Container c = getContainer(containerID);
|
||||
switch(event.getType()) {
|
||||
|
||||
case CONTAINER_REMOTE_LAUNCH:
|
||||
ContainerRemoteLaunchEvent launchEvent
|
||||
= (ContainerRemoteLaunchEvent) event;
|
||||
|
||||
try {
|
||||
commandTimer.schedule(timerTask, nmTimeOut);
|
||||
|
||||
proxy = getCMProxy(containerID, containerManagerBindAddr,
|
||||
containerToken);
|
||||
|
||||
// Interrupted during getProxy, but that didn't throw exception
|
||||
if (Thread.interrupted()) {
|
||||
// The timer cancelled the command in the mean while.
|
||||
String message = "Container launch failed for " + containerID
|
||||
+ " : Start-container for " + event.getContainerID()
|
||||
+ " got interrupted. Returning.";
|
||||
sendContainerLaunchFailedMsg(taskAttemptID, message);
|
||||
return;
|
||||
}
|
||||
|
||||
// Construct the actual Container
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
launchEvent.getContainer();
|
||||
|
||||
// Now launch the actual container
|
||||
StartContainerRequest startRequest = Records
|
||||
.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
StartContainerResponse response = proxy.startContainer(startRequest);
|
||||
|
||||
// container started properly. Stop the timer
|
||||
timerTask.cancel();
|
||||
if (Thread.interrupted()) {
|
||||
// The timer cancelled the command in the mean while, but
|
||||
// startContainer didn't throw exception
|
||||
String message = "Container launch failed for " + containerID
|
||||
+ " : Start-container for " + event.getContainerID()
|
||||
+ " got interrupted. Returning.";
|
||||
sendContainerLaunchFailedMsg(taskAttemptID, message);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer portInfo = response
|
||||
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
|
||||
int port = -1;
|
||||
if(portInfo != null) {
|
||||
port = ShuffleHandler.deserializeMetaData(portInfo);
|
||||
}
|
||||
LOG.info("Shuffle port returned by ContainerManager for "
|
||||
+ taskAttemptID + " : " + port);
|
||||
|
||||
if(port < 0) {
|
||||
throw new IllegalStateException("Invalid shuffle port number "
|
||||
+ port + " returned for " + taskAttemptID);
|
||||
}
|
||||
|
||||
// after launching, send launched event to task attempt to move
|
||||
// it from ASSIGNED to RUNNING state
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
|
||||
} catch (Throwable t) {
|
||||
if (Thread.interrupted()) {
|
||||
// The timer cancelled the command in the mean while.
|
||||
LOG.info("Start-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
}
|
||||
String message = "Container launch failed for " + containerID
|
||||
+ " : " + StringUtils.stringifyException(t);
|
||||
sendContainerLaunchFailedMsg(taskAttemptID, message);
|
||||
} finally {
|
||||
timerTask.cancel();
|
||||
if (proxy != null) {
|
||||
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
|
||||
}
|
||||
}
|
||||
|
||||
c.launch(launchEvent);
|
||||
break;
|
||||
|
||||
case CONTAINER_REMOTE_CLEANUP:
|
||||
// We will have to remove the launch (meant "cleanup"? FIXME) event if it is still in eventQueue
|
||||
// and not yet processed
|
||||
if (eventQueue.contains(event)) {
|
||||
eventQueue.remove(event); // TODO: Any synchro needed?
|
||||
//deallocate the container
|
||||
context.getEventHandler().handle(
|
||||
new ContainerAllocatorEvent(taskAttemptID,
|
||||
ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
|
||||
} else {
|
||||
|
||||
try {
|
||||
commandTimer.schedule(timerTask, nmTimeOut);
|
||||
|
||||
proxy = getCMProxy(containerID, containerManagerBindAddr,
|
||||
containerToken);
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
// The timer cancelled the command in the mean while. No need to
|
||||
// return, send cleanedup event anyways.
|
||||
LOG.info("Stop-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
} else {
|
||||
|
||||
// TODO:check whether container is launched
|
||||
|
||||
// kill the remote container if already launched
|
||||
StopContainerRequest stopRequest = Records
|
||||
.newRecord(StopContainerRequest.class);
|
||||
stopRequest.setContainerId(event.getContainerID());
|
||||
proxy.stopContainer(stopRequest);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
||||
if (Thread.interrupted()) {
|
||||
// The timer cancelled the command in the mean while, clear the
|
||||
// interrupt flag
|
||||
LOG.info("Stop-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
}
|
||||
|
||||
// ignore the cleanup failure
|
||||
String message = "cleanup failed for container "
|
||||
+ event.getContainerID() + " : "
|
||||
+ StringUtils.stringifyException(t);
|
||||
context.getEventHandler()
|
||||
.handle(
|
||||
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
|
||||
message));
|
||||
LOG.warn(message);
|
||||
} finally {
|
||||
timerTask.cancel();
|
||||
if (Thread.interrupted()) {
|
||||
LOG.info("Stop-container for " + event.getContainerID()
|
||||
+ " got interrupted.");
|
||||
// ignore the cleanup failure
|
||||
context.getEventHandler()
|
||||
.handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
|
||||
"cleanup failed for container " + event.getContainerID()));
|
||||
}
|
||||
if (proxy != null) {
|
||||
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
|
||||
}
|
||||
}
|
||||
|
||||
// after killing, send killed event to taskattempt
|
||||
context.getEventHandler().handle(
|
||||
new TaskAttemptEvent(event.getTaskAttemptID(),
|
||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||
}
|
||||
c.kill(event);
|
||||
break;
|
||||
}
|
||||
removeContainerIfDone(containerID);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,11 @@ public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
|
|||
return this.task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return super.equals(obj);
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.launcher;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.ShuffleHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerLauncherImpl {
|
||||
static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
|
||||
private static final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
|
||||
private static class ContainerLauncherImplUnderTest extends
|
||||
ContainerLauncherImpl {
|
||||
|
||||
private YarnRPC rpc;
|
||||
|
||||
public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
|
||||
super(context);
|
||||
this.rpc = rpc;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected YarnRPC createYarnRPC(Configuration conf) {
|
||||
return rpc;
|
||||
}
|
||||
|
||||
public void waitForPoolToIdle() throws InterruptedException {
|
||||
//I wish that we did not need the sleep, but it is here so that we are sure
|
||||
// That the other thread had time to insert the event into the queue and
|
||||
// start processing it. For some reason we were getting interrupted
|
||||
// exceptions within eventQueue without this sleep.
|
||||
Thread.sleep(100l);
|
||||
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
|
||||
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
|
||||
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
|
||||
while(!this.eventQueue.isEmpty() ||
|
||||
!this.launcherPool.getQueue().isEmpty() ||
|
||||
this.launcherPool.getActiveCount() > 0) {
|
||||
Thread.sleep(100l);
|
||||
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
|
||||
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
|
||||
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
|
||||
}
|
||||
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
|
||||
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
|
||||
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
|
||||
}
|
||||
}
|
||||
|
||||
public static ContainerId makeContainerId(long ts, int appId, int attemptId,
|
||||
int id) {
|
||||
return BuilderUtils.newContainerId(
|
||||
BuilderUtils.newApplicationAttemptId(
|
||||
BuilderUtils.newApplicationId(ts, appId), attemptId), id);
|
||||
}
|
||||
|
||||
public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId,
|
||||
TaskType taskType, int id) {
|
||||
ApplicationId aID = BuilderUtils.newApplicationId(ts, appId);
|
||||
JobId jID = MRBuilderUtils.newJobId(aID, id);
|
||||
TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType);
|
||||
return MRBuilderUtils.newTaskAttemptId(tID, id);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandle() throws Exception {
|
||||
LOG.info("STARTING testHandle");
|
||||
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||
|
||||
ContainerManager mockCM = mock(ContainerManager.class);
|
||||
when(mockRpc.getProxy(eq(ContainerManager.class),
|
||||
any(InetSocketAddress.class), any(Configuration.class)))
|
||||
.thenReturn(mockCM);
|
||||
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
ut.init(conf);
|
||||
ut.start();
|
||||
try {
|
||||
ContainerId contId = makeContainerId(0l, 0, 0, 1);
|
||||
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
|
||||
|
||||
LOG.info("inserting launch event");
|
||||
ContainerRemoteLaunchEvent mockLaunchEvent =
|
||||
mock(ContainerRemoteLaunchEvent.class);
|
||||
when(mockLaunchEvent.getType())
|
||||
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
|
||||
when(mockLaunchEvent.getContainerID())
|
||||
.thenReturn(contId);
|
||||
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
||||
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
||||
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
||||
ut.handle(mockLaunchEvent);
|
||||
|
||||
ut.waitForPoolToIdle();
|
||||
|
||||
verify(mockCM).startContainer(any(StartContainerRequest.class));
|
||||
|
||||
LOG.info("inserting cleanup event");
|
||||
ContainerLauncherEvent mockCleanupEvent =
|
||||
mock(ContainerLauncherEvent.class);
|
||||
when(mockCleanupEvent.getType())
|
||||
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
|
||||
when(mockCleanupEvent.getContainerID())
|
||||
.thenReturn(contId);
|
||||
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
||||
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
||||
ut.handle(mockCleanupEvent);
|
||||
|
||||
ut.waitForPoolToIdle();
|
||||
|
||||
verify(mockCM).stopContainer(any(StopContainerRequest.class));
|
||||
} finally {
|
||||
ut.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutOfOrder() throws Exception {
|
||||
LOG.info("STARTING testOutOfOrder");
|
||||
YarnRPC mockRpc = mock(YarnRPC.class);
|
||||
AppContext mockContext = mock(AppContext.class);
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
||||
|
||||
ContainerManager mockCM = mock(ContainerManager.class);
|
||||
when(mockRpc.getProxy(eq(ContainerManager.class),
|
||||
any(InetSocketAddress.class), any(Configuration.class)))
|
||||
.thenReturn(mockCM);
|
||||
|
||||
ContainerLauncherImplUnderTest ut =
|
||||
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
ut.init(conf);
|
||||
ut.start();
|
||||
try {
|
||||
ContainerId contId = makeContainerId(0l, 0, 0, 1);
|
||||
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
|
||||
String cmAddress = "127.0.0.1:8000";
|
||||
StartContainerResponse startResp =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
|
||||
ShuffleHandler.serializeMetaData(80));
|
||||
|
||||
LOG.info("inserting cleanup event");
|
||||
ContainerLauncherEvent mockCleanupEvent =
|
||||
mock(ContainerLauncherEvent.class);
|
||||
when(mockCleanupEvent.getType())
|
||||
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
|
||||
when(mockCleanupEvent.getContainerID())
|
||||
.thenReturn(contId);
|
||||
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
||||
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
||||
ut.handle(mockCleanupEvent);
|
||||
|
||||
ut.waitForPoolToIdle();
|
||||
|
||||
verify(mockCM, never()).stopContainer(any(StopContainerRequest.class));
|
||||
|
||||
LOG.info("inserting launch event");
|
||||
ContainerRemoteLaunchEvent mockLaunchEvent =
|
||||
mock(ContainerRemoteLaunchEvent.class);
|
||||
when(mockLaunchEvent.getType())
|
||||
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
|
||||
when(mockLaunchEvent.getContainerID())
|
||||
.thenReturn(contId);
|
||||
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
||||
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
||||
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
||||
ut.handle(mockLaunchEvent);
|
||||
|
||||
ut.waitForPoolToIdle();
|
||||
|
||||
verify(mockCM, never()).startContainer(any(StartContainerRequest.class));
|
||||
} finally {
|
||||
ut.stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -164,7 +164,7 @@ public class ShuffleHandler extends AbstractService
|
|||
* @param port the port to be sent to the ApplciationMaster
|
||||
* @return the serialized form of the port.
|
||||
*/
|
||||
static ByteBuffer serializeMetaData(int port) throws IOException {
|
||||
public static ByteBuffer serializeMetaData(int port) throws IOException {
|
||||
//TODO these bytes should be versioned
|
||||
DataOutputBuffer port_dob = new DataOutputBuffer();
|
||||
port_dob.writeInt(port);
|
||||
|
|
Loading…
Reference in New Issue