MAPREDUCE-3312. Modified MR AM to not send a stop-container request for a container that isn't launched at all. Contributed by Robert Joseph Evans.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229451 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-10 02:15:48 +00:00
parent 48150cddaf
commit 849c68c7b5
5 changed files with 446 additions and 158 deletions

View File

@ -165,6 +165,9 @@ Release 0.23.1 - Unreleased
for the thread loop interval separate from task-timeout configuration for the thread loop interval separate from task-timeout configuration
property. (Siddharth Seth via vinodkv) property. (Siddharth Seth via vinodkv)
MAPREDUCE-3312. Modified MR AM to not send a stop-container request for
a container that isn't launched at all. (Robert Joseph Evans via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

View File

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -44,8 +45,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -75,16 +74,217 @@ public class ContainerLauncherImpl extends AbstractService implements
int nmTimeOut; int nmTimeOut;
private ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
private AppContext context; private AppContext context;
protected ThreadPoolExecutor launcherPool; protected ThreadPoolExecutor launcherPool;
protected static final int INITIAL_POOL_SIZE = 10; protected static final int INITIAL_POOL_SIZE = 10;
private int limitOnPoolSize; private int limitOnPoolSize;
private Thread eventHandlingThread; private Thread eventHandlingThread;
private BlockingQueue<ContainerLauncherEvent> eventQueue = protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>(); new LinkedBlockingQueue<ContainerLauncherEvent>();
final Timer commandTimer = new Timer(true); final Timer commandTimer = new Timer(true);
YarnRPC rpc; YarnRPC rpc;
private Container getContainer(ContainerId id) {
Container c = containers.get(id);
if(c == null) {
c = new Container();
Container old = containers.putIfAbsent(id, c);
if(old != null) {
c = old;
}
}
return c;
}
private void removeContainerIfDone(ContainerId id) {
Container c = containers.get(id);
if(c != null && c.isCompletelyDone()) {
containers.remove(id);
}
}
private static enum ContainerState {
PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
}
private class Container {
private ContainerState state;
public Container() {
this.state = ContainerState.PREP;
}
public synchronized boolean isCompletelyDone() {
return state == ContainerState.DONE || state == ContainerState.FAILED;
}
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("Launching " + taskAttemptID);
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
sendContainerLaunchFailedMsg(taskAttemptID,
"Container was killed before it was launched");
return;
}
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
ContainerManager proxy = null;
try {
commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
// Interrupted during getProxy, but that didn't throw exception
if (Thread.interrupted()) {
// The timer canceled the command in the mean while.
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
}
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
event.getContainer();
// Now launch the actual container
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);
// container started properly. Stop the timer
timerTask.cancel();
if (Thread.interrupted()) {
// The timer canceled the command in the mean while, but
// startContainer didn't throw exception
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
}
ByteBuffer portInfo = response
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
if(portInfo != null) {
port = ShuffleHandler.deserializeMetaData(portInfo);
}
LOG.info("Shuffle port returned by ContainerManager for "
+ taskAttemptID + " : " + port);
if(port < 0) {
this.state = ContainerState.FAILED;
throw new IllegalStateException("Invalid shuffle port number "
+ port + " returned for " + taskAttemptID);
}
// after launching, send launched event to task attempt to move
// it from ASSIGNED to RUNNING state
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer canceled the command in the mean while.
LOG.info("Start-container for " + event.getContainerID()
+ " got interrupted.");
}
String message = "Container launch failed for " + containerID + " : "
+ StringUtils.stringifyException(t);
this.state = ContainerState.FAILED;
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
timerTask.cancel();
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
}
}
@SuppressWarnings("unchecked")
public synchronized void kill(ContainerLauncherEvent event) {
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
} else {
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
LOG.info("KILLING " + taskAttemptID);
commandTimer.schedule(timerTask, nmTimeOut);
ContainerManager proxy = null;
try {
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
if (Thread.interrupted()) {
// The timer canceled the command in the mean while. No need to
// return, send cleaned up event anyways.
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
} else {
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID());
proxy.stopContainer(stopRequest);
}
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer canceled the command in the mean while, clear the
// interrupt flag
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
}
// ignore the cleanup failure
String message = "cleanup failed for container "
+ event.getContainerID() + " : "
+ StringUtils.stringifyException(t);
context.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
LOG.warn(message);
} finally {
timerTask.cancel();
if (Thread.interrupted()) {
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
// ignore the cleanup failure
context.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
"cleanup failed for container " + event.getContainerID()));
}
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
}
this.state = ContainerState.DONE;
}
// after killing, send killed event to task attempt
context.getEventHandler().handle(
new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
}
}
// To track numNodes. // To track numNodes.
Set<String> allNodes = new HashSet<String>(); Set<String> allNodes = new HashSet<String>();
@ -105,10 +305,14 @@ public class ContainerLauncherImpl extends AbstractService implements
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT); ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
this.rpc = YarnRPC.create(conf); this.rpc = createYarnRPC(conf);
super.init(conf); super.init(conf);
} }
protected YarnRPC createYarnRPC(Configuration conf) {
return YarnRPC.create(conf);
}
public void start() { public void start() {
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
@ -119,7 +323,7 @@ public class ContainerLauncherImpl extends AbstractService implements
Integer.MAX_VALUE, 1, TimeUnit.HOURS, Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(),
tf); tf);
eventHandlingThread = new Thread(new Runnable() { eventHandlingThread = new Thread() {
@Override @Override
public void run() { public void run() {
ContainerLauncherEvent event = null; ContainerLauncherEvent event = null;
@ -162,7 +366,7 @@ public class ContainerLauncherImpl extends AbstractService implements
// NodeManager into a single connection // NodeManager into a single connection
} }
} }
}); };
eventHandlingThread.setName("ContainerLauncher Event Handler"); eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start(); eventHandlingThread.start();
super.start(); super.start();
@ -255,175 +459,28 @@ public class ContainerLauncherImpl extends AbstractService implements
this.event = event; this.event = event;
} }
@SuppressWarnings("unchecked")
@Override @Override
public void run() { public void run() {
LOG.info("Processing the event " + event.toString()); LOG.info("Processing the event " + event.toString());
// Load ContainerManager tokens before creating a connection. // Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager. // TODO: Do it only once per NodeManager.
final String containerManagerBindAddr = event.getContainerMgrAddress();
ContainerId containerID = event.getContainerID(); ContainerId containerID = event.getContainerID();
ContainerToken containerToken = event.getContainerToken();
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
ContainerManager proxy = null;
CommandTimerTask timerTask = new CommandTimerTask(Thread
.currentThread(), event);
Container c = getContainer(containerID);
switch(event.getType()) { switch(event.getType()) {
case CONTAINER_REMOTE_LAUNCH: case CONTAINER_REMOTE_LAUNCH:
ContainerRemoteLaunchEvent launchEvent ContainerRemoteLaunchEvent launchEvent
= (ContainerRemoteLaunchEvent) event; = (ContainerRemoteLaunchEvent) event;
c.launch(launchEvent);
try {
commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
// Interrupted during getProxy, but that didn't throw exception
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while.
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
}
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
launchEvent.getContainer();
// Now launch the actual container
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);
// container started properly. Stop the timer
timerTask.cancel();
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while, but
// startContainer didn't throw exception
String message = "Container launch failed for " + containerID
+ " : Start-container for " + event.getContainerID()
+ " got interrupted. Returning.";
sendContainerLaunchFailedMsg(taskAttemptID, message);
return;
}
ByteBuffer portInfo = response
.getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
int port = -1;
if(portInfo != null) {
port = ShuffleHandler.deserializeMetaData(portInfo);
}
LOG.info("Shuffle port returned by ContainerManager for "
+ taskAttemptID + " : " + port);
if(port < 0) {
throw new IllegalStateException("Invalid shuffle port number "
+ port + " returned for " + taskAttemptID);
}
// after launching, send launched event to task attempt to move
// it from ASSIGNED to RUNNING state
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while.
LOG.info("Start-container for " + event.getContainerID()
+ " got interrupted.");
}
String message = "Container launch failed for " + containerID
+ " : " + StringUtils.stringifyException(t);
sendContainerLaunchFailedMsg(taskAttemptID, message);
} finally {
timerTask.cancel();
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
}
break; break;
case CONTAINER_REMOTE_CLEANUP: case CONTAINER_REMOTE_CLEANUP:
// We will have to remove the launch (meant "cleanup"? FIXME) event if it is still in eventQueue c.kill(event);
// and not yet processed
if (eventQueue.contains(event)) {
eventQueue.remove(event); // TODO: Any synchro needed?
//deallocate the container
context.getEventHandler().handle(
new ContainerAllocatorEvent(taskAttemptID,
ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
} else {
try {
commandTimer.schedule(timerTask, nmTimeOut);
proxy = getCMProxy(containerID, containerManagerBindAddr,
containerToken);
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while. No need to
// return, send cleanedup event anyways.
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
} else {
// TODO:check whether container is launched
// kill the remote container if already launched
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(event.getContainerID());
proxy.stopContainer(stopRequest);
}
} catch (Throwable t) {
if (Thread.interrupted()) {
// The timer cancelled the command in the mean while, clear the
// interrupt flag
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
}
// ignore the cleanup failure
String message = "cleanup failed for container "
+ event.getContainerID() + " : "
+ StringUtils.stringifyException(t);
context.getEventHandler()
.handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
message));
LOG.warn(message);
} finally {
timerTask.cancel();
if (Thread.interrupted()) {
LOG.info("Stop-container for " + event.getContainerID()
+ " got interrupted.");
// ignore the cleanup failure
context.getEventHandler()
.handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
"cleanup failed for container " + event.getContainerID()));
}
if (proxy != null) {
ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
}
}
// after killing, send killed event to taskattempt
context.getEventHandler().handle(
new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
}
break; break;
} }
removeContainerIfDone(containerID);
} }
} }

View File

@ -47,6 +47,11 @@ public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
return this.task; return this.task;
} }
@Override
public int hashCode() {
return super.hashCode();
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
return super.equals(obj); return super.equals(obj);

View File

@ -0,0 +1,223 @@
package org.apache.hadoop.mapreduce.v2.app.launcher;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestContainerLauncherImpl {
static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private static class ContainerLauncherImplUnderTest extends
ContainerLauncherImpl {
private YarnRPC rpc;
public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
super(context);
this.rpc = rpc;
}
@Override
protected YarnRPC createYarnRPC(Configuration conf) {
return rpc;
}
public void waitForPoolToIdle() throws InterruptedException {
//I wish that we did not need the sleep, but it is here so that we are sure
// That the other thread had time to insert the event into the queue and
// start processing it. For some reason we were getting interrupted
// exceptions within eventQueue without this sleep.
Thread.sleep(100l);
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
while(!this.eventQueue.isEmpty() ||
!this.launcherPool.getQueue().isEmpty() ||
this.launcherPool.getActiveCount() > 0) {
Thread.sleep(100l);
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
}
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
}
}
public static ContainerId makeContainerId(long ts, int appId, int attemptId,
int id) {
return BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(ts, appId), attemptId), id);
}
public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId,
TaskType taskType, int id) {
ApplicationId aID = BuilderUtils.newApplicationId(ts, appId);
JobId jID = MRBuilderUtils.newJobId(aID, id);
TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType);
return MRBuilderUtils.newTaskAttemptId(tID, id);
}
@Test
public void testHandle() throws Exception {
LOG.info("STARTING testHandle");
YarnRPC mockRpc = mock(YarnRPC.class);
AppContext mockContext = mock(AppContext.class);
@SuppressWarnings("rawtypes")
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManager mockCM = mock(ContainerManager.class);
when(mockRpc.getProxy(eq(ContainerManager.class),
any(InetSocketAddress.class), any(Configuration.class)))
.thenReturn(mockCM);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
verify(mockCM).startContainer(any(StartContainerRequest.class));
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
ut.waitForPoolToIdle();
verify(mockCM).stopContainer(any(StopContainerRequest.class));
} finally {
ut.stop();
}
}
@Test
public void testOutOfOrder() throws Exception {
LOG.info("STARTING testOutOfOrder");
YarnRPC mockRpc = mock(YarnRPC.class);
AppContext mockContext = mock(AppContext.class);
@SuppressWarnings("rawtypes")
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManager mockCM = mock(ContainerManager.class);
when(mockRpc.getProxy(eq(ContainerManager.class),
any(InetSocketAddress.class), any(Configuration.class)))
.thenReturn(mockCM);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockRpc);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainerResponse startResp =
recordFactory.newRecordInstance(StartContainerResponse.class);
startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
ut.waitForPoolToIdle();
verify(mockCM, never()).stopContainer(any(StopContainerRequest.class));
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
verify(mockCM, never()).startContainer(any(StartContainerRequest.class));
} finally {
ut.stop();
}
}
}

View File

@ -164,7 +164,7 @@ public class ShuffleHandler extends AbstractService
* @param port the port to be sent to the ApplciationMaster * @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port. * @return the serialized form of the port.
*/ */
static ByteBuffer serializeMetaData(int port) throws IOException { public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned //TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer(); DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port); port_dob.writeInt(port);