YARN-1510. Make NMClient support change container resources. (Meng Ding via wangda)

This commit is contained in:
Wangda Tan 2015-11-10 11:45:46 -08:00
parent 493e8ae552
commit c99925d6dd
8 changed files with 452 additions and 19 deletions

View File

@ -244,6 +244,9 @@ Release 2.8.0 - UNRELEASED
YARN-2729. Support script based NodeLabelsProvider Interface in Distributed Node Label YARN-2729. Support script based NodeLabelsProvider Interface in Distributed Node Label
Configuration Setup. (Naganarasimha G R via rohithsharmaks) Configuration Setup. (Naganarasimha G R via rohithsharmaks)
YARN-1510. Make NMClient support change container resources.
(Meng Ding via wangda)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -858,8 +858,7 @@ public class ApplicationMaster {
} }
@VisibleForTesting @VisibleForTesting
static class NMCallbackHandler static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
implements NMClientAsync.CallbackHandler {
private ConcurrentMap<ContainerId, Container> containers = private ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>(); new ConcurrentHashMap<ContainerId, Container>();
@ -907,6 +906,10 @@ public class ApplicationMaster {
} }
} }
@Override
public void onContainerResourceIncreased(
ContainerId containerId, Resource resource) {}
@Override @Override
public void onStartContainerError(ContainerId containerId, Throwable t) { public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId); LOG.error("Failed to start Container " + containerId);
@ -926,6 +929,11 @@ public class ApplicationMaster {
LOG.error("Failed to stop Container " + containerId); LOG.error("Failed to stop Container " + containerId);
containers.remove(containerId); containers.remove(containerId);
} }
@Override
public void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t) {}
} }
/** /**

View File

@ -86,6 +86,22 @@ public abstract class NMClient extends AbstractService {
ContainerLaunchContext containerLaunchContext) ContainerLaunchContext containerLaunchContext)
throws YarnException, IOException; throws YarnException, IOException;
/**
* <p>Increase the resource of a container.</p>
*
* <p>The <code>ApplicationMaster</code> or other applications that use the
* client must provide the details of the container, including the Id and
* the target resource encapsulated in the updated container token via
* {@link Container}.
* </p>
*
* @param container the container with updated token
* @throws YarnException
* @throws IOException
*/
public abstract void increaseContainerResource(Container container)
throws YarnException, IOException;
/** /**
* <p>Stop an started container.</p> * <p>Stop an started container.</p>
* *

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@ -51,11 +52,16 @@ import com.google.common.annotations.VisibleForTesting;
* *
* <pre> * <pre>
* {@code * {@code
* class MyCallbackHandler implements NMClientAsync.CallbackHandler { * class MyCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
* public void onContainerStarted(ContainerId containerId, * public void onContainerStarted(ContainerId containerId,
* Map<String, ByteBuffer> allServiceResponse) { * Map<String, ByteBuffer> allServiceResponse) {
* [post process after the container is started, process the response] * [post process after the container is started, process the response]
* } * }
* public void onContainerResourceIncreased(ContainerId containerId,
* Resource resource) {
* [post process after the container resource is increased]
* }
* *
* public void onContainerStatusReceived(ContainerId containerId, * public void onContainerStatusReceived(ContainerId containerId,
* ContainerStatus containerStatus) { * ContainerStatus containerStatus) {
@ -111,21 +117,58 @@ public abstract class NMClientAsync extends AbstractService {
protected NMClient client; protected NMClient client;
protected CallbackHandler callbackHandler; protected CallbackHandler callbackHandler;
public static NMClientAsync createNMClientAsync(
AbstractCallbackHandler callbackHandler) {
return new NMClientAsyncImpl(callbackHandler);
}
protected NMClientAsync(AbstractCallbackHandler callbackHandler) {
this (NMClientAsync.class.getName(), callbackHandler);
}
protected NMClientAsync(
String name, AbstractCallbackHandler callbackHandler) {
this (name, new NMClientImpl(), callbackHandler);
}
protected NMClientAsync(String name, NMClient client,
AbstractCallbackHandler callbackHandler) {
super(name);
this.setClient(client);
this.setCallbackHandler(callbackHandler);
}
/**
* @deprecated Use {@link #createNMClientAsync(AbstractCallbackHandler)}
* instead.
*/
@Deprecated
public static NMClientAsync createNMClientAsync( public static NMClientAsync createNMClientAsync(
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
return new NMClientAsyncImpl(callbackHandler); return new NMClientAsyncImpl(callbackHandler);
} }
/**
* @deprecated Use {@link #NMClientAsync(AbstractCallbackHandler)}
* instead.
*/
@Deprecated
protected NMClientAsync(CallbackHandler callbackHandler) { protected NMClientAsync(CallbackHandler callbackHandler) {
this (NMClientAsync.class.getName(), callbackHandler); this (NMClientAsync.class.getName(), callbackHandler);
} }
/**
* @deprecated Use {@link #NMClientAsync(String, AbstractCallbackHandler)}
* instead.
*/
@Deprecated
protected NMClientAsync(String name, CallbackHandler callbackHandler) { protected NMClientAsync(String name, CallbackHandler callbackHandler) {
this (name, new NMClientImpl(), callbackHandler); this (name, new NMClientImpl(), callbackHandler);
} }
@Private @Private
@VisibleForTesting @VisibleForTesting
@Deprecated
protected NMClientAsync(String name, NMClient client, protected NMClientAsync(String name, NMClient client,
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
super(name); super(name);
@ -136,6 +179,8 @@ public abstract class NMClientAsync extends AbstractService {
public abstract void startContainerAsync( public abstract void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext); Container container, ContainerLaunchContext containerLaunchContext);
public abstract void increaseContainerResourceAsync(Container container);
public abstract void stopContainerAsync( public abstract void stopContainerAsync(
ContainerId containerId, NodeId nodeId); ContainerId containerId, NodeId nodeId);
@ -159,6 +204,110 @@ public abstract class NMClientAsync extends AbstractService {
} }
/** /**
* <p>
* The callback abstract class. The callback functions need to be implemented
* by {@link NMClientAsync} users. The APIs are called when responses from
* <code>NodeManager</code> are available.
* </p>
*
* <p>
* Once a callback happens, the users can chose to act on it in blocking or
* non-blocking manner. If the action on callback is done in a blocking
* manner, some of the threads performing requests on NodeManagers may get
* blocked depending on how many threads in the pool are busy.
* </p>
*
* <p>
* The implementation of the callback functions should not throw the
* unexpected exception. Otherwise, {@link NMClientAsync} will just
* catch, log and then ignore it.
* </p>
*/
public abstract static class AbstractCallbackHandler
implements CallbackHandler {
/**
* The API is called when <code>NodeManager</code> responds to indicate its
* acceptance of the starting container request.
*
* @param containerId the Id of the container
* @param allServiceResponse a Map between the auxiliary service names and
* their outputs
*/
public abstract void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse);
/**
* The API is called when <code>NodeManager</code> responds with the status
* of the container.
*
* @param containerId the Id of the container
* @param containerStatus the status of the container
*/
public abstract void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus);
/**
* The API is called when <code>NodeManager</code> responds to indicate the
* container is stopped.
*
* @param containerId the Id of the container
*/
public abstract void onContainerStopped(ContainerId containerId);
/**
* The API is called when an exception is raised in the process of
* starting a container.
*
* @param containerId the Id of the container
* @param t the raised exception
*/
public abstract void onStartContainerError(
ContainerId containerId, Throwable t);
/**
* The API is called when <code>NodeManager</code> responds to indicate
* the container resource has been successfully increased.
*
* @param containerId the Id of the container
* @param resource the target resource of the container
*/
public abstract void onContainerResourceIncreased(
ContainerId containerId, Resource resource);
/**
* The API is called when an exception is raised in the process of
* querying the status of a container.
*
* @param containerId the Id of the container
* @param t the raised exception
*/
public abstract void onGetContainerStatusError(
ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* increasing container resource.
*
* @param containerId the Id of the container
* @param t the raised exception
*/
public abstract void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* stopping a container.
*
* @param containerId the Id of the container
* @param t the raised exception
*/
public abstract void onStopContainerError(
ContainerId containerId, Throwable t);
}
/**
* @deprecated Use {@link NMClientAsync.AbstractCallbackHandler} instead.
*
* <p> * <p>
* The callback interface needs to be implemented by {@link NMClientAsync} * The callback interface needs to be implemented by {@link NMClientAsync}
* users. The APIs are called when responses from <code>NodeManager</code> are * users. The APIs are called when responses from <code>NodeManager</code> are
@ -178,6 +327,7 @@ public abstract class NMClientAsync extends AbstractService {
* catch, log and then ignore it. * catch, log and then ignore it.
* </p> * </p>
*/ */
@Deprecated
public static interface CallbackHandler { public static interface CallbackHandler {
/** /**
* The API is called when <code>NodeManager</code> responds to indicate its * The API is called when <code>NodeManager</code> responds to indicate its

View File

@ -82,16 +82,46 @@ public class NMClientAsyncImpl extends NMClientAsync {
protected ConcurrentMap<ContainerId, StatefulContainer> containers = protected ConcurrentMap<ContainerId, StatefulContainer> containers =
new ConcurrentHashMap<ContainerId, StatefulContainer>(); new ConcurrentHashMap<ContainerId, StatefulContainer>();
public NMClientAsyncImpl(AbstractCallbackHandler callbackHandler) {
this(NMClientAsync.class.getName(), callbackHandler);
}
public NMClientAsyncImpl(
String name, AbstractCallbackHandler callbackHandler) {
this(name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
protected NMClientAsyncImpl(String name, NMClient client,
AbstractCallbackHandler callbackHandler) {
super(name, client, callbackHandler);
this.client = client;
this.callbackHandler = callbackHandler;
}
/**
* @deprecated Use {@link
* #NMClientAsyncImpl(NMClientAsync.AbstractCallbackHandler)}
* instead.
*/
@Deprecated
public NMClientAsyncImpl(CallbackHandler callbackHandler) { public NMClientAsyncImpl(CallbackHandler callbackHandler) {
this(NMClientAsync.class.getName(), callbackHandler); this(NMClientAsync.class.getName(), callbackHandler);
} }
/**
* @deprecated Use {@link #NMClientAsyncImpl(String,
* NMClientAsync.AbstractCallbackHandler)} instead.
*/
@Deprecated
public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) { public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
this(name, new NMClientImpl(), callbackHandler); this(name, new NMClientImpl(), callbackHandler);
} }
@Private @Private
@VisibleForTesting @VisibleForTesting
@Deprecated
protected NMClientAsyncImpl(String name, NMClient client, protected NMClientAsyncImpl(String name, NMClient client,
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
super(name, client, callbackHandler); super(name, client, callbackHandler);
@ -229,6 +259,29 @@ public class NMClientAsyncImpl extends NMClientAsync {
} }
} }
public void increaseContainerResourceAsync(Container container) {
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource "
+ "increase callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(container.getId()) == null) {
handler.onIncreaseContainerResourceError(
container.getId(),
RPCUtil.getRemoteException(
"Container " + container.getId() +
" is neither started nor scheduled to start"));
}
try {
events.put(new IncreaseContainerResourceEvent(container));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId());
handler.onIncreaseContainerResourceError(container.getId(), e);
}
}
public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
if (containers.get(containerId) == null) { if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId, callbackHandler.onStopContainerError(containerId,
@ -276,7 +329,8 @@ public class NMClientAsyncImpl extends NMClientAsync {
protected static enum ContainerEventType { protected static enum ContainerEventType {
START_CONTAINER, START_CONTAINER,
STOP_CONTAINER, STOP_CONTAINER,
QUERY_CONTAINER QUERY_CONTAINER,
INCREASE_CONTAINER_RESOURCE
} }
protected static class ContainerEvent protected static class ContainerEvent
@ -327,6 +381,21 @@ public class NMClientAsyncImpl extends NMClientAsync {
} }
} }
protected static class IncreaseContainerResourceEvent extends ContainerEvent {
private Container container;
public IncreaseContainerResourceEvent(Container container) {
super(container.getId(), container.getNodeId(),
container.getContainerToken(),
ContainerEventType.INCREASE_CONTAINER_RESOURCE);
this.container = container;
}
public Container getContainer() {
return container;
}
}
protected static class StatefulContainer implements protected static class StatefulContainer implements
EventHandler<ContainerEvent> { EventHandler<ContainerEvent> {
@ -344,7 +413,9 @@ public class NMClientAsyncImpl extends NMClientAsync {
ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition()) ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
// Transitions from RUNNING state // Transitions from RUNNING state
// RUNNING -> RUNNING should be the invalid transition .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.INCREASE_CONTAINER_RESOURCE,
new IncreaseContainerResourceTransition())
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.DONE, ContainerState.FAILED), EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
ContainerEventType.STOP_CONTAINER, ContainerEventType.STOP_CONTAINER,
@ -353,12 +424,14 @@ public class NMClientAsyncImpl extends NMClientAsync {
// Transition from DONE state // Transition from DONE state
.addTransition(ContainerState.DONE, ContainerState.DONE, .addTransition(ContainerState.DONE, ContainerState.DONE,
EnumSet.of(ContainerEventType.START_CONTAINER, EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER)) ContainerEventType.STOP_CONTAINER,
ContainerEventType.INCREASE_CONTAINER_RESOURCE))
// Transition from FAILED state // Transition from FAILED state
.addTransition(ContainerState.FAILED, ContainerState.FAILED, .addTransition(ContainerState.FAILED, ContainerState.FAILED,
EnumSet.of(ContainerEventType.START_CONTAINER, EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER)); ContainerEventType.STOP_CONTAINER,
ContainerEventType.INCREASE_CONTAINER_RESOURCE));
protected static class StartContainerTransition implements protected static class StartContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent, MultipleArcTransition<StatefulContainer, ContainerEvent,
@ -410,6 +483,52 @@ public class NMClientAsyncImpl extends NMClientAsync {
} }
} }
protected static class IncreaseContainerResourceTransition implements
SingleArcTransition<StatefulContainer, ContainerEvent> {
@Override
public void transition(
StatefulContainer container, ContainerEvent event) {
if (!(container.nmClientAsync.getCallbackHandler()
instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container resource "
+ "increase callback methods");
return;
}
AbstractCallbackHandler handler =
(AbstractCallbackHandler) container.nmClientAsync
.getCallbackHandler();
try {
if (!(event instanceof IncreaseContainerResourceEvent)) {
throw new AssertionError("Unexpected event type. Expecting:"
+ "IncreaseContainerResourceEvent. Got:" + event);
}
IncreaseContainerResourceEvent increaseEvent =
(IncreaseContainerResourceEvent) event;
container.nmClientAsync.getClient().increaseContainerResource(
increaseEvent.getContainer());
try {
handler.onContainerResourceIncreased(
increaseEvent.getContainerId(), increaseEvent.getContainer()
.getResource());
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from "
+ "onContainerResourceIncreased for Container "
+ event.getContainerId(), thr);
}
} catch (Exception e) {
try {
handler.onIncreaseContainerResourceError(event.getContainerId(), e);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from "
+ "onIncreaseContainerResourceError for Container "
+ event.getContainerId(), thr);
}
}
}
}
protected static class StopContainerTransition implements protected static class StopContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent, MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> { ContainerState> {

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -147,8 +149,7 @@ public class NMClientImpl extends NMClient {
private ContainerState state; private ContainerState state;
public StartedContainer(ContainerId containerId, NodeId nodeId, public StartedContainer(ContainerId containerId, NodeId nodeId) {
Token containerToken) {
this.containerId = containerId; this.containerId = containerId;
this.nodeId = nodeId; this.nodeId = nodeId;
state = ContainerState.NEW; state = ContainerState.NEW;
@ -231,6 +232,34 @@ public class NMClientImpl extends NMClient {
} }
} }
@Override
public void increaseContainerResource(Container container)
throws YarnException, IOException {
ContainerManagementProtocolProxyData proxy = null;
try {
proxy = cmProxy.getProxy(
container.getNodeId().toString(), container.getId());
List<Token> increaseTokens = new ArrayList<>();
increaseTokens.add(container.getContainerToken());
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest
.newInstance(increaseTokens);
IncreaseContainersResourceResponse response =
proxy.getContainerManagementProtocol()
.increaseContainersResource(increaseRequest);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(container.getId())) {
Throwable t = response.getFailedRequests().get(container.getId())
.deSerialize();
parseAndThrowException(t);
}
} finally {
if (proxy != null) {
cmProxy.mayBeCloseProxy(proxy);
}
}
}
@Override @Override
public void stopContainer(ContainerId containerId, NodeId nodeId) public void stopContainer(ContainerId containerId, NodeId nodeId)
throws YarnException, IOException { throws YarnException, IOException {
@ -308,7 +337,7 @@ public class NMClientImpl extends NMClient {
protected synchronized StartedContainer createStartedContainer( protected synchronized StartedContainer createStartedContainer(
Container container) throws YarnException, IOException { Container container) throws YarnException, IOException {
StartedContainer startedContainer = new StartedContainer(container.getId(), StartedContainer startedContainer = new StartedContainer(container.getId(),
container.getNodeId(), container.getContainerToken()); container.getNodeId());
return startedContainer; return startedContainer;
} }

View File

@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.hadoop.yarn.api.records.Resource;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -116,6 +116,10 @@ public class TestNMClientAsync {
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
asyncClient.startContainerAsync(container, clc); asyncClient.startContainerAsync(container, clc);
} }
while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isIncreaseResourceFailureCallsExecuted()) {
Thread.sleep(10);
}
while (!((TestCallbackHandler1) asyncClient.getCallbackHandler()) while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isStopFailureCallsExecuted()) { .isStopFailureCallsExecuted()) {
Thread.sleep(10); Thread.sleep(10);
@ -183,7 +187,7 @@ public class TestNMClientAsync {
} }
private class TestCallbackHandler1 private class TestCallbackHandler1
implements NMClientAsync.CallbackHandler { extends NMClientAsync.AbstractCallbackHandler {
private boolean path = true; private boolean path = true;
@ -196,6 +200,10 @@ public class TestNMClientAsync {
private AtomicInteger actualQueryFailure = new AtomicInteger(0); private AtomicInteger actualQueryFailure = new AtomicInteger(0);
private AtomicInteger actualStopSuccess = new AtomicInteger(0); private AtomicInteger actualStopSuccess = new AtomicInteger(0);
private AtomicInteger actualStopFailure = new AtomicInteger(0); private AtomicInteger actualStopFailure = new AtomicInteger(0);
private AtomicInteger actualIncreaseResourceSuccess =
new AtomicInteger(0);
private AtomicInteger actualIncreaseResourceFailure =
new AtomicInteger(0);
private AtomicIntegerArray actualStartSuccessArray; private AtomicIntegerArray actualStartSuccessArray;
private AtomicIntegerArray actualStartFailureArray; private AtomicIntegerArray actualStartFailureArray;
@ -203,6 +211,8 @@ public class TestNMClientAsync {
private AtomicIntegerArray actualQueryFailureArray; private AtomicIntegerArray actualQueryFailureArray;
private AtomicIntegerArray actualStopSuccessArray; private AtomicIntegerArray actualStopSuccessArray;
private AtomicIntegerArray actualStopFailureArray; private AtomicIntegerArray actualStopFailureArray;
private AtomicIntegerArray actualIncreaseResourceSuccessArray;
private AtomicIntegerArray actualIncreaseResourceFailureArray;
private Set<String> errorMsgs = private Set<String> errorMsgs =
Collections.synchronizedSet(new HashSet<String>()); Collections.synchronizedSet(new HashSet<String>());
@ -217,6 +227,10 @@ public class TestNMClientAsync {
actualQueryFailureArray = new AtomicIntegerArray(expectedFailure); actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess); actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
actualStopFailureArray = new AtomicIntegerArray(expectedFailure); actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
actualIncreaseResourceSuccessArray =
new AtomicIntegerArray(expectedSuccess);
actualIncreaseResourceFailureArray =
new AtomicIntegerArray(expectedFailure);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -236,7 +250,11 @@ public class TestNMClientAsync {
asyncClient.getContainerStatusAsync(containerId, nodeId); asyncClient.getContainerStatusAsync(containerId, nodeId);
} else { } else {
// move on to the following failure tests // move on to the following failure tests
asyncClient.stopContainerAsync(containerId, nodeId); // make sure we pass in the container with the same
// containerId
Container container = Container.newInstance(
containerId, nodeId, null, null, null, containerToken);
asyncClient.increaseContainerResourceAsync(container);
} }
// Shouldn't crash the test thread // Shouldn't crash the test thread
@ -255,12 +273,33 @@ public class TestNMClientAsync {
actualQuerySuccess.addAndGet(1); actualQuerySuccess.addAndGet(1);
actualQuerySuccessArray.set(containerId.getId(), 1); actualQuerySuccessArray.set(containerId.getId(), 1);
// move on to the following success tests // move on to the following success tests
asyncClient.stopContainerAsync(containerId, nodeId); // make sure we pass in the container with the same
// containerId
Container container = Container.newInstance(
containerId, nodeId, null, null, null, containerToken);
asyncClient.increaseContainerResourceAsync(container);
// Shouldn't crash the test thread // Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
} }
@SuppressWarnings("deprecation")
@Override
public void onContainerResourceIncreased(
ContainerId containerId, Resource resource) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerResourceIncreased");
return;
}
actualIncreaseResourceSuccess.addAndGet(1);
actualIncreaseResourceSuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.stopContainerAsync(containerId, nodeId);
// throw a fake user exception, and shouldn't crash the test
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public void onContainerStopped(ContainerId containerId) { public void onContainerStopped(ContainerId containerId) {
@ -300,6 +339,26 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
} }
@SuppressWarnings("deprecation")
@Override
public void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t) {
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onIncreaseContainerResourceError");
return;
}
actualIncreaseResourceFailure.addAndGet(1);
actualIncreaseResourceFailureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increase container resource error should NOT change the
// the container status to FAILED
// move on to the following failure tests
asyncClient.stopContainerAsync(containerId, nodeId);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public void onStopContainerError(ContainerId containerId, Throwable t) { public void onStopContainerError(ContainerId containerId, Throwable t) {
@ -345,10 +404,12 @@ public class TestNMClientAsync {
boolean isAllSuccessCallsExecuted = boolean isAllSuccessCallsExecuted =
actualStartSuccess.get() == expectedSuccess && actualStartSuccess.get() == expectedSuccess &&
actualQuerySuccess.get() == expectedSuccess && actualQuerySuccess.get() == expectedSuccess &&
actualIncreaseResourceSuccess.get() == expectedSuccess &&
actualStopSuccess.get() == expectedSuccess; actualStopSuccess.get() == expectedSuccess;
if (isAllSuccessCallsExecuted) { if (isAllSuccessCallsExecuted) {
assertAtomicIntegerArray(actualStartSuccessArray); assertAtomicIntegerArray(actualStartSuccessArray);
assertAtomicIntegerArray(actualQuerySuccessArray); assertAtomicIntegerArray(actualQuerySuccessArray);
assertAtomicIntegerArray(actualIncreaseResourceSuccessArray);
assertAtomicIntegerArray(actualStopSuccessArray); assertAtomicIntegerArray(actualStopSuccessArray);
} }
return isAllSuccessCallsExecuted; return isAllSuccessCallsExecuted;
@ -365,6 +426,15 @@ public class TestNMClientAsync {
return isStartAndQueryFailureCallsExecuted; return isStartAndQueryFailureCallsExecuted;
} }
public boolean isIncreaseResourceFailureCallsExecuted() {
boolean isIncreaseResourceFailureCallsExecuted =
actualIncreaseResourceFailure.get() == expectedFailure;
if (isIncreaseResourceFailureCallsExecuted) {
assertAtomicIntegerArray(actualIncreaseResourceFailureArray);
}
return isIncreaseResourceFailureCallsExecuted;
}
public boolean isStopFailureCallsExecuted() { public boolean isStopFailureCallsExecuted() {
boolean isStopFailureCallsExecuted = boolean isStopFailureCallsExecuted =
actualStopFailure.get() == expectedFailure; actualStopFailure.get() == expectedFailure;
@ -392,6 +462,8 @@ public class TestNMClientAsync {
when(client.getContainerStatus(any(ContainerId.class), when(client.getContainerStatus(any(ContainerId.class),
any(NodeId.class))).thenReturn( any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class)); recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).increaseContainerResource(
any(Container.class));
doNothing().when(client).stopContainer(any(ContainerId.class), doNothing().when(client).stopContainer(any(ContainerId.class),
any(NodeId.class)); any(NodeId.class));
break; break;
@ -411,6 +483,8 @@ public class TestNMClientAsync {
when(client.getContainerStatus(any(ContainerId.class), when(client.getContainerStatus(any(ContainerId.class),
any(NodeId.class))).thenReturn( any(NodeId.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class)); recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
.when(client).increaseContainerResource(any(Container.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client) doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
.stopContainer(any(ContainerId.class), any(NodeId.class)); .stopContainer(any(ContainerId.class), any(NodeId.class));
} }
@ -493,7 +567,7 @@ public class TestNMClientAsync {
} }
private class TestCallbackHandler2 private class TestCallbackHandler2
implements NMClientAsync.CallbackHandler { extends NMClientAsync.AbstractCallbackHandler {
private CyclicBarrier barrierC; private CyclicBarrier barrierC;
private AtomicBoolean exceptionOccurred = new AtomicBoolean(false); private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
@ -511,6 +585,10 @@ public class TestNMClientAsync {
ContainerStatus containerStatus) { ContainerStatus containerStatus) {
} }
@Override
public void onContainerResourceIncreased(
ContainerId containerId, Resource resource) {}
@Override @Override
public void onContainerStopped(ContainerId containerId) { public void onContainerStopped(ContainerId containerId) {
} }
@ -536,10 +614,13 @@ public class TestNMClientAsync {
Throwable t) { Throwable t) {
} }
@Override
public void onIncreaseContainerResourceError(
ContainerId containerId, Throwable t) {}
@Override @Override
public void onStopContainerError(ContainerId containerId, Throwable t) { public void onStopContainerError(ContainerId containerId, Throwable t) {
} }
} }
private Container mockContainer(int i) { private Container mockContainer(int i) {

View File

@ -210,10 +210,10 @@ public class TestNMClient {
testContainerManagement(nmClient, allocateContainers(rmClient, 5)); testContainerManagement(nmClient, allocateContainers(rmClient, 5));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null); null, null);
// don't stop the running containers // don't stop the running containers
stopNmClient(false); stopNmClient(false);
assertFalse(nmClient.startedContainers. isEmpty()); assertFalse(nmClient.startedContainers.isEmpty());
//now cleanup //now cleanup
nmClient.cleanupRunningContainers(); nmClient.cleanupRunningContainers();
assertEquals(0, nmClient.startedContainers.size()); assertEquals(0, nmClient.startedContainers.size());
@ -298,6 +298,16 @@ public class TestNMClient {
e.getMessage().contains("is not handled by this NodeManager")); e.getMessage().contains("is not handled by this NodeManager"));
} }
// increaseContainerResource shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.increaseContainerResource(container);
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// stopContainer shouldn't be called before startContainer, // stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown // otherwise, an exception will be thrown
try { try {
@ -332,6 +342,8 @@ public class TestNMClient {
// NodeManager may still need some time to make the container started // NodeManager may still need some time to make the container started
testGetContainerStatus(container, i, ContainerState.RUNNING, "", testGetContainerStatus(container, i, ContainerState.RUNNING, "",
Arrays.asList(new Integer[] {-1000})); Arrays.asList(new Integer[] {-1000}));
// Test increase container API and make sure requests can reach NM
testIncreaseContainerResource(container);
try { try {
nmClient.stopContainer(container.getId(), container.getNodeId()); nmClient.stopContainer(container.getId(), container.getNodeId());
@ -397,4 +409,19 @@ public class TestNMClient {
} }
} }
private void testIncreaseContainerResource(Container container)
throws YarnException, IOException {
try {
nmClient.increaseContainerResource(container);
} catch (YarnException e) {
// NM container will only be in LOCALIZED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
} }