YARN-6306. NMClient API change for container upgrade. Contributed by Arun Suresh

This commit is contained in:
Jian He 2017-05-16 10:48:46 -07:00
parent c17cb03a23
commit 82751ff0cb
7 changed files with 966 additions and 78 deletions

View File

@ -58,6 +58,10 @@ public abstract class NMClient extends AbstractService {
return client; return client;
} }
protected enum UpgradeOp {
REINIT, RESTART, COMMIT, ROLLBACK
}
private NMTokenCache nmTokenCache = NMTokenCache.getSingleton(); private NMTokenCache nmTokenCache = NMTokenCache.getSingleton();
@Private @Private
@ -79,8 +83,8 @@ public abstract class NMClient extends AbstractService {
* <code>NodeManager</code> to launch the * <code>NodeManager</code> to launch the
* container * container
* @return a map between the auxiliary service names and their outputs * @return a map between the auxiliary service names and their outputs
* @throws YarnException * @throws YarnException YarnException.
* @throws IOException * @throws IOException IOException.
*/ */
public abstract Map<String, ByteBuffer> startContainer(Container container, public abstract Map<String, ByteBuffer> startContainer(Container container,
ContainerLaunchContext containerLaunchContext) ContainerLaunchContext containerLaunchContext)
@ -95,9 +99,10 @@ public abstract class NMClient extends AbstractService {
* {@link Container}. * {@link Container}.
* </p> * </p>
* *
* @param container the container with updated token * @param container the container with updated token.
* @throws YarnException *
* @throws IOException * @throws YarnException YarnException.
* @throws IOException IOException.
*/ */
public abstract void increaseContainerResource(Container container) public abstract void increaseContainerResource(Container container)
throws YarnException, IOException; throws YarnException, IOException;
@ -108,8 +113,8 @@ public abstract class NMClient extends AbstractService {
* @param containerId the Id of the started container * @param containerId the Id of the started container
* @param nodeId the Id of the <code>NodeManager</code> * @param nodeId the Id of the <code>NodeManager</code>
* *
* @throws YarnException * @throws YarnException YarnException.
* @throws IOException * @throws IOException IOException.
*/ */
public abstract void stopContainer(ContainerId containerId, NodeId nodeId) public abstract void stopContainer(ContainerId containerId, NodeId nodeId)
throws YarnException, IOException; throws YarnException, IOException;
@ -120,13 +125,61 @@ public abstract class NMClient extends AbstractService {
* @param containerId the Id of the started container * @param containerId the Id of the started container
* @param nodeId the Id of the <code>NodeManager</code> * @param nodeId the Id of the <code>NodeManager</code>
* *
* @return the status of a container * @return the status of a container.
* @throws YarnException *
* @throws IOException * @throws YarnException YarnException.
* @throws IOException IOException.
*/ */
public abstract ContainerStatus getContainerStatus(ContainerId containerId, public abstract ContainerStatus getContainerStatus(ContainerId containerId,
NodeId nodeId) throws YarnException, IOException; NodeId nodeId) throws YarnException, IOException;
/**
* <p>Re-Initialize the Container.</p>
*
* @param containerId the Id of the container to Re-Initialize.
* @param containerLaunchContex the updated ContainerLaunchContext.
* @param autoCommit commit re-initialization automatically ?
*
* @throws YarnException YarnException.
* @throws IOException IOException.
*/
public abstract void reInitializeContainer(ContainerId containerId,
ContainerLaunchContext containerLaunchContex, boolean autoCommit)
throws YarnException, IOException;
/**
* <p>Restart the specified container.</p>
*
* @param containerId the Id of the container to restart.
*
* @throws YarnException YarnException.
* @throws IOException IOException.
*/
public abstract void restartContainer(ContainerId containerId)
throws YarnException, IOException;
/**
* <p>Rollback last reInitialization of the specified container.</p>
*
* @param containerId the Id of the container to restart.
*
* @throws YarnException YarnException.
* @throws IOException IOException.
*/
public abstract void rollbackLastReInitialization(ContainerId containerId)
throws YarnException, IOException;
/**
* <p>Commit last reInitialization of the specified container.</p>
*
* @param containerId the Id of the container to commit reInitialize.
*
* @throws YarnException YarnException.
* @throws IOException IOException.
*/
public abstract void commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException;
/** /**
* <p>Set whether the containers that are started by this client, and are * <p>Set whether the containers that are started by this client, and are
* still running should be stopped when the client stops. By default, the * still running should be stopped when the client stops. By default, the
@ -165,4 +218,15 @@ public abstract class NMClient extends AbstractService {
return nmTokenCache; return nmTokenCache;
} }
/**
* Get the NodeId of the node on which container is running. It returns
* null if the container if container is not found or if it is not running.
*
* @param containerId Container Id of the container.
* @return NodeId of the container on which it is running.
*/
public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
return null;
}
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.client.api.async;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl; import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
@ -181,6 +179,38 @@ public abstract class NMClientAsync extends AbstractService {
public abstract void increaseContainerResourceAsync(Container container); public abstract void increaseContainerResourceAsync(Container container);
/**
* <p>Re-Initialize the Container.</p>
*
* @param containerId the Id of the container to Re-Initialize.
* @param containerLaunchContex the updated ContainerLaunchContext.
* @param autoCommit commit re-initialization automatically ?
*/
public abstract void reInitializeContainerAsync(ContainerId containerId,
ContainerLaunchContext containerLaunchContex, boolean autoCommit);
/**
* <p>Restart the specified container.</p>
*
* @param containerId the Id of the container to restart.
*/
public abstract void restartContainerAsync(ContainerId containerId);
/**
* <p>Rollback last reInitialization of the specified container.</p>
*
* @param containerId the Id of the container to restart.
*/
public abstract void rollbackLastReInitializationAsync(
ContainerId containerId);
/**
* <p>Commit last reInitialization of the specified container.</p>
*
* @param containerId the Id of the container to commit reInitialize.
*/
public abstract void commitLastReInitializationAsync(ContainerId containerId);
public abstract void stopContainerAsync( public abstract void stopContainerAsync(
ContainerId containerId, NodeId nodeId); ContainerId containerId, NodeId nodeId);
@ -303,6 +333,70 @@ public abstract class NMClientAsync extends AbstractService {
*/ */
public abstract void onStopContainerError( public abstract void onStopContainerError(
ContainerId containerId, Throwable t); ContainerId containerId, Throwable t);
/**
* Callback for container re-initialization request.
*
* @param containerId the Id of the container to be Re-Initialized.
*/
public void onContainerReInitialize(ContainerId containerId) {}
/**
* Callback for container restart.
*
* @param containerId the Id of the container to restart.
*/
public void onContainerRestart(ContainerId containerId) {}
/**
* Callback for rollback of last re-initialization.
*
* @param containerId the Id of the container to restart.
*/
public void onRollbackLastReInitialization(ContainerId containerId) {}
/**
* Callback for commit of last re-initialization.
*
* @param containerId the Id of the container to commit reInitialize.
*/
public void onCommitLastReInitialization(ContainerId containerId) {}
/**
* Error Callback for container re-initialization request.
*
* @param containerId the Id of the container to be Re-Initialized.
* @param t a Throwable.
*/
public void onContainerReInitializeError(ContainerId containerId,
Throwable t) {}
/**
* Error Callback for container restart.
*
* @param containerId the Id of the container to restart.
* @param t a Throwable.
*
*/
public void onContainerRestartError(ContainerId containerId, Throwable t) {}
/**
* Error Callback for rollback of last re-initialization.
*
* @param containerId the Id of the container to restart.
* @param t a Throwable.
*/
public void onRollbackLastReInitializationError(ContainerId containerId,
Throwable t) {}
/**
* Error Callback for commit of last re-initialization.
*
* @param containerId the Id of the container to commit reInitialize.
* @param t a Throwable.
*/
public void onCommitLastReInitializationError(ContainerId containerId,
Throwable t) {}
} }
/** /**

View File

@ -282,6 +282,103 @@ public class NMClientAsyncImpl extends NMClientAsync {
} }
} }
@Override
public void reInitializeContainerAsync(ContainerId containerId,
ContainerLaunchContext containerLaunchContex, boolean autoCommit){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container re-initialize "
+ "callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onContainerReInitializeError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ReInitializeContainerEvevnt(containerId,
client.getNodeIdOfStartedContainer(containerId),
containerLaunchContex, autoCommit));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of re-initializing of "
+ "Container " + containerId);
handler.onContainerReInitializeError(containerId, e);
}
}
@Override
public void restartContainerAsync(ContainerId containerId){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container restart "
+ "callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onContainerRestartError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ContainerEvent(containerId,
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.RESTART_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of restart of "
+ "Container " + containerId);
handler.onContainerRestartError(containerId, e);
}
}
@Override
public void rollbackLastReInitializationAsync(ContainerId containerId){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container rollback "
+ "callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onRollbackLastReInitializationError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ContainerEvent(containerId,
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.ROLLBACK_LAST_REINIT));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event Rollback re-initialization"
+ " of Container " + containerId);
handler.onRollbackLastReInitializationError(containerId, e);
}
}
@Override
public void commitLastReInitializationAsync(ContainerId containerId){
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
LOG.error("Callback handler does not implement container commit last " +
"re-initialization callback methods");
return;
}
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
if (containers.get(containerId) == null) {
handler.onCommitLastReInitializationError(
containerId, RPCUtil.getRemoteException(
"Container " + containerId + " is not started"));
}
try {
events.put(new ContainerEvent(containerId,
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.COMMIT_LAST_REINT));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event Commit re-initialization"
+ " of Container " + containerId);
handler.onCommitLastReInitializationError(containerId, e);
}
}
public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
if (containers.get(containerId) == null) { if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId, callbackHandler.onStopContainerError(containerId,
@ -330,7 +427,11 @@ public class NMClientAsyncImpl extends NMClientAsync {
START_CONTAINER, START_CONTAINER,
STOP_CONTAINER, STOP_CONTAINER,
QUERY_CONTAINER, QUERY_CONTAINER,
INCREASE_CONTAINER_RESOURCE INCREASE_CONTAINER_RESOURCE,
REINITIALIZE_CONTAINER,
RESTART_CONTAINER,
ROLLBACK_LAST_REINIT,
COMMIT_LAST_REINT
} }
protected static class ContainerEvent protected static class ContainerEvent
@ -381,6 +482,27 @@ public class NMClientAsyncImpl extends NMClientAsync {
} }
} }
protected static class ReInitializeContainerEvevnt extends ContainerEvent {
private ContainerLaunchContext containerLaunchContext;
private boolean autoCommit;
public ReInitializeContainerEvevnt(ContainerId containerId, NodeId nodeId,
ContainerLaunchContext containerLaunchContext, boolean autoCommit) {
super(containerId, nodeId, null,
ContainerEventType.REINITIALIZE_CONTAINER);
this.containerLaunchContext = containerLaunchContext;
this.autoCommit = autoCommit;
}
public ContainerLaunchContext getContainerLaunchContext() {
return containerLaunchContext;
}
public boolean isAutoCommit() {
return autoCommit;
}
}
protected static class IncreaseContainerResourceEvent extends ContainerEvent { protected static class IncreaseContainerResourceEvent extends ContainerEvent {
private Container container; private Container container;
@ -416,6 +538,25 @@ public class NMClientAsyncImpl extends NMClientAsync {
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING, .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.INCREASE_CONTAINER_RESOURCE, ContainerEventType.INCREASE_CONTAINER_RESOURCE,
new IncreaseContainerResourceTransition()) new IncreaseContainerResourceTransition())
// Transitions for Container Upgrade
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.RESTART_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.ROLLBACK_LAST_REINIT,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.COMMIT_LAST_REINT,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.DONE, ContainerState.FAILED), EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
ContainerEventType.STOP_CONTAINER, ContainerEventType.STOP_CONTAINER,
@ -431,6 +572,10 @@ public class NMClientAsyncImpl extends NMClientAsync {
.addTransition(ContainerState.FAILED, ContainerState.FAILED, .addTransition(ContainerState.FAILED, ContainerState.FAILED,
EnumSet.of(ContainerEventType.START_CONTAINER, EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER, ContainerEventType.STOP_CONTAINER,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.RESTART_CONTAINER,
ContainerEventType.COMMIT_LAST_REINT,
ContainerEventType.ROLLBACK_LAST_REINIT,
ContainerEventType.INCREASE_CONTAINER_RESOURCE)); ContainerEventType.INCREASE_CONTAINER_RESOURCE));
protected static class StartContainerTransition implements protected static class StartContainerTransition implements
@ -529,6 +674,119 @@ public class NMClientAsyncImpl extends NMClientAsync {
} }
} }
protected static class ReInitializeContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> {
@Override
public ContainerState transition(StatefulContainer container,
ContainerEvent containerEvent) {
ContainerId containerId = containerEvent.getContainerId();
AbstractCallbackHandler handler = (AbstractCallbackHandler) container
.nmClientAsync.getCallbackHandler();
Throwable handlerError = null;
try {
switch(containerEvent.getType()) {
case REINITIALIZE_CONTAINER:
if (!(containerEvent instanceof ReInitializeContainerEvevnt)) {
LOG.error("Unexpected Event.. [" +containerEvent.getType() + "]");
return ContainerState.FAILED;
}
ReInitializeContainerEvevnt rEvent =
(ReInitializeContainerEvevnt)containerEvent;
container.nmClientAsync.getClient().reInitializeContainer(
containerId, rEvent.getContainerLaunchContext(),
rEvent.isAutoCommit());
try {
handler.onContainerReInitialize(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case RESTART_CONTAINER:
container.nmClientAsync.getClient().restartContainer(containerId);
try {
handler.onContainerRestart(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case ROLLBACK_LAST_REINIT:
container.nmClientAsync.getClient()
.rollbackLastReInitialization(containerId);
try {
handler.onRollbackLastReInitialization(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case COMMIT_LAST_REINT:
container.nmClientAsync.getClient()
.commitLastReInitialization(containerId);
try {
handler.onCommitLastReInitialization(containerId);
} catch (Throwable tr) {
handlerError = tr;
}
break;
default:
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
" expected here..");
break;
}
if (handlerError != null) {
LOG.info("Unchecked exception is thrown in handler for event ["
+ containerEvent.getType() + "] for Container "
+ containerId, handlerError);
}
return ContainerState.RUNNING;
} catch (Throwable t) {
switch(containerEvent.getType()) {
case REINITIALIZE_CONTAINER:
try {
handler.onContainerReInitializeError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case RESTART_CONTAINER:
try {
handler.onContainerRestartError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case ROLLBACK_LAST_REINIT:
try {
handler.onRollbackLastReInitializationError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
case COMMIT_LAST_REINT:
try {
handler.onCommitLastReInitializationError(containerId, t);
} catch (Throwable tr) {
handlerError = tr;
}
break;
default:
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
" expected here..");
break;
}
if (handlerError != null) {
LOG.info("Unchecked exception is thrown in handler for event ["
+ containerEvent.getType() + "] for Container "
+ containerId, handlerError);
}
}
return ContainerState.FAILED;
}
}
protected static class StopContainerTransition implements protected static class StopContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent, MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> { ContainerState> {

View File

@ -33,10 +33,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -306,6 +309,84 @@ public class NMClientImpl extends NMClient {
} }
} }
@Override
public void reInitializeContainer(ContainerId containerId,
ContainerLaunchContext containerLaunchContex, boolean autoCommit)
throws YarnException, IOException {
ContainerManagementProtocolProxyData proxy = null;
StartedContainer container = startedContainers.get(containerId);
if (container != null) {
synchronized (container) {
proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
try {
proxy.getContainerManagementProtocol().reInitializeContainer(
ReInitializeContainerRequest.newInstance(
containerId, containerLaunchContex, autoCommit));
} finally {
if (proxy != null) {
cmProxy.mayBeCloseProxy(proxy);
}
}
}
} else {
throw new YarnException("Unknown container [" + containerId + "]");
}
}
@Override
public void restartContainer(ContainerId containerId)
throws YarnException, IOException {
restartCommitOrRollbackContainer(containerId, UpgradeOp.RESTART);
}
@Override
public void rollbackLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
restartCommitOrRollbackContainer(containerId, UpgradeOp.ROLLBACK);
}
@Override
public void commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
restartCommitOrRollbackContainer(containerId, UpgradeOp.COMMIT);
}
private void restartCommitOrRollbackContainer(ContainerId containerId,
UpgradeOp upgradeOp) throws YarnException, IOException {
ContainerManagementProtocolProxyData proxy = null;
StartedContainer container = startedContainers.get(containerId);
if (container != null) {
synchronized (container) {
proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
ContainerManagementProtocol cmp =
proxy.getContainerManagementProtocol();
try {
switch (upgradeOp) {
case RESTART:
cmp.restartContainer(containerId);
break;
case COMMIT:
cmp.commitLastReInitialization(containerId);
break;
case ROLLBACK:
cmp.rollbackLastReInitialization(containerId);
break;
default:
// Should not happen..
break;
}
} finally {
if (proxy != null) {
cmProxy.mayBeCloseProxy(proxy);
}
}
}
} else {
throw new YarnException("Unknown container [" + containerId + "]");
}
}
private void stopContainerInternal(ContainerId containerId, NodeId nodeId) private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
throws IOException, YarnException { throws IOException, YarnException {
ContainerManagementProtocolProxyData proxy = null; ContainerManagementProtocolProxyData proxy = null;
@ -343,4 +424,14 @@ public class NMClientImpl extends NMClient {
throw (IOException) t; throw (IOException) t;
} }
} }
@Override
public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
StartedContainer container = startedContainers.get(containerId);
if (container != null) {
return container.getNodeId();
}
return null;
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.async.impl; package org.apache.hadoop.yarn.client.api.async.impl;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -27,6 +28,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -69,6 +73,22 @@ public class TestNMClientAsync {
private NodeId nodeId; private NodeId nodeId;
private Token containerToken; private Token containerToken;
enum OpsToTest {
START, QUERY, STOP, INCR, REINIT, RESTART, ROLLBACK, COMMIT
}
final static class TestData {
AtomicInteger success = new AtomicInteger(0);
AtomicInteger failure = new AtomicInteger(0);
final AtomicIntegerArray successArray;
final AtomicIntegerArray failureArray;
private TestData(int expectedSuccess, int expectedFailure) {
this.successArray = new AtomicIntegerArray(expectedSuccess);
this.failureArray = new AtomicIntegerArray(expectedFailure);
}
}
@After @After
public void teardown() { public void teardown() {
ServiceOperations.stop(asyncClient); ServiceOperations.stop(asyncClient);
@ -194,25 +214,7 @@ public class TestNMClientAsync {
private int expectedSuccess; private int expectedSuccess;
private int expectedFailure; private int expectedFailure;
private AtomicInteger actualStartSuccess = new AtomicInteger(0); private final Map<OpsToTest, TestData> testMap = new HashMap<>();
private AtomicInteger actualStartFailure = new AtomicInteger(0);
private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
private AtomicInteger actualQueryFailure = new AtomicInteger(0);
private AtomicInteger actualStopSuccess = new AtomicInteger(0);
private AtomicInteger actualStopFailure = new AtomicInteger(0);
private AtomicInteger actualIncreaseResourceSuccess =
new AtomicInteger(0);
private AtomicInteger actualIncreaseResourceFailure =
new AtomicInteger(0);
private AtomicIntegerArray actualStartSuccessArray;
private AtomicIntegerArray actualStartFailureArray;
private AtomicIntegerArray actualQuerySuccessArray;
private AtomicIntegerArray actualQueryFailureArray;
private AtomicIntegerArray actualStopSuccessArray;
private AtomicIntegerArray actualStopFailureArray;
private AtomicIntegerArray actualIncreaseResourceSuccessArray;
private AtomicIntegerArray actualIncreaseResourceFailureArray;
private Set<String> errorMsgs = private Set<String> errorMsgs =
Collections.synchronizedSet(new HashSet<String>()); Collections.synchronizedSet(new HashSet<String>());
@ -221,16 +223,9 @@ public class TestNMClientAsync {
this.expectedSuccess = expectedSuccess; this.expectedSuccess = expectedSuccess;
this.expectedFailure = expectedFailure; this.expectedFailure = expectedFailure;
actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess); for (OpsToTest op : OpsToTest.values()) {
actualStartFailureArray = new AtomicIntegerArray(expectedFailure); testMap.put(op, new TestData(expectedSuccess, expectedFailure));
actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess); }
actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
actualIncreaseResourceSuccessArray =
new AtomicIntegerArray(expectedSuccess);
actualIncreaseResourceFailureArray =
new AtomicIntegerArray(expectedFailure);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -243,8 +238,9 @@ public class TestNMClientAsync {
" should throw the exception onContainerStarted"); " should throw the exception onContainerStarted");
return; return;
} }
actualStartSuccess.addAndGet(1); TestData td = testMap.get(OpsToTest.START);
actualStartSuccessArray.set(containerId.getId(), 1); td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests // move on to the following success tests
asyncClient.getContainerStatusAsync(containerId, nodeId); asyncClient.getContainerStatusAsync(containerId, nodeId);
@ -254,7 +250,28 @@ public class TestNMClientAsync {
// containerId // containerId
Container container = Container.newInstance( Container container = Container.newInstance(
containerId, nodeId, null, null, null, containerToken); containerId, nodeId, null, null, null, containerToken);
int t = containerId.getId() % 5;
switch (t) {
case 0:
asyncClient.increaseContainerResourceAsync(container); asyncClient.increaseContainerResourceAsync(container);
break;
case 1:
asyncClient.reInitializeContainerAsync(containerId,
recordFactory.newRecordInstance(ContainerLaunchContext.class),
true);
break;
case 2:
asyncClient.restartContainerAsync(containerId);
break;
case 3:
asyncClient.rollbackLastReInitializationAsync(containerId);
break;
case 4:
asyncClient.commitLastReInitializationAsync(containerId);
break;
default:
break;
}
} }
// Shouldn't crash the test thread // Shouldn't crash the test thread
@ -270,8 +287,9 @@ public class TestNMClientAsync {
" should throw the exception onContainerStatusReceived"); " should throw the exception onContainerStatusReceived");
return; return;
} }
actualQuerySuccess.addAndGet(1); TestData td = testMap.get(OpsToTest.QUERY);
actualQuerySuccessArray.set(containerId.getId(), 1); td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests // move on to the following success tests
// make sure we pass in the container with the same // make sure we pass in the container with the same
// containerId // containerId
@ -292,8 +310,78 @@ public class TestNMClientAsync {
" should throw the exception onContainerResourceIncreased"); " should throw the exception onContainerResourceIncreased");
return; return;
} }
actualIncreaseResourceSuccess.addAndGet(1); TestData td = testMap.get(OpsToTest.INCR);
actualIncreaseResourceSuccessArray.set(containerId.getId(), 1); td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.reInitializeContainerAsync(containerId,
Records.newRecord(ContainerLaunchContext.class), true);
// throw a fake user exception, and shouldn't crash the test
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onContainerReInitialize(ContainerId containerId) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerReInitialize");
return;
}
TestData td = testMap.get(OpsToTest.REINIT);
td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.restartContainerAsync(containerId);
// throw a fake user exception, and shouldn't crash the test
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onContainerRestart(ContainerId containerId) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerReInitialize");
return;
}
TestData td = testMap.get(OpsToTest.RESTART);
td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.rollbackLastReInitializationAsync(containerId);
// throw a fake user exception, and shouldn't crash the test
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onRollbackLastReInitialization(ContainerId containerId) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerReInitialize");
return;
}
TestData td = testMap.get(OpsToTest.ROLLBACK);
td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.commitLastReInitializationAsync(containerId);
// throw a fake user exception, and shouldn't crash the test
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onCommitLastReInitialization(ContainerId containerId) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerReInitialize");
return;
}
TestData td = testMap.get(OpsToTest.COMMIT);
td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// move on to the following success tests // move on to the following success tests
asyncClient.stopContainerAsync(containerId, nodeId); asyncClient.stopContainerAsync(containerId, nodeId);
// throw a fake user exception, and shouldn't crash the test // throw a fake user exception, and shouldn't crash the test
@ -308,8 +396,9 @@ public class TestNMClientAsync {
" should throw the exception onContainerStopped"); " should throw the exception onContainerStopped");
return; return;
} }
actualStopSuccess.addAndGet(1); TestData td = testMap.get(OpsToTest.STOP);
actualStopSuccessArray.set(containerId.getId(), 1); td.success.addAndGet(1);
td.successArray.set(containerId.getId(), 1);
// Shouldn't crash the test thread // Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
@ -330,8 +419,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onStartContainerError"); " shouldn't throw the exception onStartContainerError");
return; return;
} }
actualStartFailure.addAndGet(1); TestData td = testMap.get(OpsToTest.START);
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1); td.failure.addAndGet(1);
td.failureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests // move on to the following failure tests
asyncClient.getContainerStatusAsync(containerId, nodeId); asyncClient.getContainerStatusAsync(containerId, nodeId);
@ -348,8 +438,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onIncreaseContainerResourceError"); " shouldn't throw the exception onIncreaseContainerResourceError");
return; return;
} }
actualIncreaseResourceFailure.addAndGet(1); TestData td = testMap.get(OpsToTest.INCR);
actualIncreaseResourceFailureArray.set( td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1); containerId.getId() - expectedSuccess - expectedFailure, 1);
// increase container resource error should NOT change the // increase container resource error should NOT change the
// the container status to FAILED // the container status to FAILED
@ -359,6 +450,102 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
} }
@SuppressWarnings("deprecation")
@Override
public void onContainerReInitializeError(ContainerId containerId,
Throwable t) {
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onContainerReInitializeError");
return;
}
TestData td = testMap.get(OpsToTest.REINIT);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increment the stop counters here.. since the container will fail
td = testMap.get(OpsToTest.STOP);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// reInit container changes the container status to FAILED
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onContainerRestartError(ContainerId containerId, Throwable t) {
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onContainerRestartError");
return;
}
TestData td = testMap.get(OpsToTest.RESTART);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increment the stop counters here.. since the container will fail
td = testMap.get(OpsToTest.STOP);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// restart container changes the container status to FAILED
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onRollbackLastReInitializationError(ContainerId containerId,
Throwable t) {
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception" +
" onRollbackLastReInitializationError");
return;
}
TestData td = testMap.get(OpsToTest.ROLLBACK);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increment the stop counters here.. since the container will fail
td = testMap.get(OpsToTest.STOP);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// rollback container changes the container status to FAILED
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation")
@Override
public void onCommitLastReInitializationError(ContainerId containerId,
Throwable t) {
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onCommitLastReInitializationError");
return;
}
TestData td = testMap.get(OpsToTest.COMMIT);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increment the stop counters here.. since the container will fail
td = testMap.get(OpsToTest.STOP);
td.failure.addAndGet(1);
td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// commit container changes the container status to FAILED
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public void onStopContainerError(ContainerId containerId, Throwable t) { public void onStopContainerError(ContainerId containerId, Throwable t) {
@ -371,9 +558,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onStopContainerError"); " shouldn't throw the exception onStopContainerError");
return; return;
} }
TestData td = testMap.get(OpsToTest.STOP);
actualStopFailure.addAndGet(1); td.failure.addAndGet(1);
actualStopFailureArray.set( td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1); containerId.getId() - expectedSuccess - expectedFailure, 1);
// Shouldn't crash the test thread // Shouldn't crash the test thread
@ -393,8 +580,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onGetContainerStatusError"); " shouldn't throw the exception onGetContainerStatusError");
return; return;
} }
actualQueryFailure.addAndGet(1); TestData td = testMap.get(OpsToTest.QUERY);
actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1); td.failure.addAndGet(1);
td.failureArray.set(containerId.getId() - expectedSuccess, 1);
// Shouldn't crash the test thread // Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception"); throw new RuntimeException("Ignorable Exception");
@ -402,44 +590,67 @@ public class TestNMClientAsync {
public boolean isAllSuccessCallsExecuted() { public boolean isAllSuccessCallsExecuted() {
boolean isAllSuccessCallsExecuted = boolean isAllSuccessCallsExecuted =
actualStartSuccess.get() == expectedSuccess && testMap.get(OpsToTest.START).success.get() == expectedSuccess &&
actualQuerySuccess.get() == expectedSuccess && testMap.get(OpsToTest.QUERY).success.get() == expectedSuccess &&
actualIncreaseResourceSuccess.get() == expectedSuccess && testMap.get(OpsToTest.INCR).success.get() == expectedSuccess &&
actualStopSuccess.get() == expectedSuccess; testMap.get(OpsToTest.REINIT).success.get() == expectedSuccess &&
testMap.get(OpsToTest.RESTART).success.get() == expectedSuccess &&
testMap.get(OpsToTest.ROLLBACK).success.get() ==
expectedSuccess &&
testMap.get(OpsToTest.COMMIT).success.get() == expectedSuccess &&
testMap.get(OpsToTest.STOP).success.get() == expectedSuccess;
if (isAllSuccessCallsExecuted) { if (isAllSuccessCallsExecuted) {
assertAtomicIntegerArray(actualStartSuccessArray); assertAtomicIntegerArray(testMap.get(OpsToTest.START).successArray);
assertAtomicIntegerArray(actualQuerySuccessArray); assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).successArray);
assertAtomicIntegerArray(actualIncreaseResourceSuccessArray); assertAtomicIntegerArray(testMap.get(OpsToTest.INCR).successArray);
assertAtomicIntegerArray(actualStopSuccessArray); assertAtomicIntegerArray(testMap.get(OpsToTest.REINIT).successArray);
assertAtomicIntegerArray(testMap.get(OpsToTest.RESTART).successArray);
assertAtomicIntegerArray(testMap.get(OpsToTest.ROLLBACK).successArray);
assertAtomicIntegerArray(testMap.get(OpsToTest.COMMIT).successArray);
assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).successArray);
} }
return isAllSuccessCallsExecuted; return isAllSuccessCallsExecuted;
} }
public boolean isStartAndQueryFailureCallsExecuted() { public boolean isStartAndQueryFailureCallsExecuted() {
boolean isStartAndQueryFailureCallsExecuted = boolean isStartAndQueryFailureCallsExecuted =
actualStartFailure.get() == expectedFailure && testMap.get(OpsToTest.START).failure.get() == expectedFailure &&
actualQueryFailure.get() == expectedFailure; testMap.get(OpsToTest.QUERY).failure.get() == expectedFailure;
if (isStartAndQueryFailureCallsExecuted) { if (isStartAndQueryFailureCallsExecuted) {
assertAtomicIntegerArray(actualStartFailureArray); assertAtomicIntegerArray(testMap.get(OpsToTest.START).failureArray);
assertAtomicIntegerArray(actualQueryFailureArray); assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).failureArray);
} }
return isStartAndQueryFailureCallsExecuted; return isStartAndQueryFailureCallsExecuted;
} }
public boolean isIncreaseResourceFailureCallsExecuted() { public boolean isIncreaseResourceFailureCallsExecuted() {
boolean isIncreaseResourceFailureCallsExecuted = boolean isIncreaseResourceFailureCallsExecuted =
actualIncreaseResourceFailure.get() == expectedFailure; testMap.get(OpsToTest.INCR).failure.get()
+ testMap.get(OpsToTest.REINIT).failure.get()
+ testMap.get(OpsToTest.RESTART).failure.get()
+ testMap.get(OpsToTest.ROLLBACK).failure.get()
+ testMap.get(OpsToTest.COMMIT).failure.get()
== expectedFailure;
if (isIncreaseResourceFailureCallsExecuted) { if (isIncreaseResourceFailureCallsExecuted) {
assertAtomicIntegerArray(actualIncreaseResourceFailureArray); AtomicIntegerArray testArray =
new AtomicIntegerArray(
testMap.get(OpsToTest.INCR).failureArray.length());
for (int i = 0; i < testArray.length(); i++) {
for (OpsToTest op : EnumSet.of(OpsToTest.REINIT, OpsToTest.RESTART,
OpsToTest.ROLLBACK, OpsToTest.COMMIT, OpsToTest.INCR)) {
testArray.addAndGet(i, testMap.get(op).failureArray.get(i));
}
}
assertAtomicIntegerArray(testArray);
} }
return isIncreaseResourceFailureCallsExecuted; return isIncreaseResourceFailureCallsExecuted;
} }
public boolean isStopFailureCallsExecuted() { public boolean isStopFailureCallsExecuted() {
boolean isStopFailureCallsExecuted = boolean isStopFailureCallsExecuted =
actualStopFailure.get() == expectedFailure; testMap.get(OpsToTest.STOP).failure.get() == expectedFailure;
if (isStopFailureCallsExecuted) { if (isStopFailureCallsExecuted) {
assertAtomicIntegerArray(actualStopFailureArray); assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).failureArray);
} }
return isStopFailureCallsExecuted; return isStopFailureCallsExecuted;
} }
@ -464,6 +675,14 @@ public class TestNMClientAsync {
recordFactory.newRecordInstance(ContainerStatus.class)); recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).increaseContainerResource( doNothing().when(client).increaseContainerResource(
any(Container.class)); any(Container.class));
doNothing().when(client).reInitializeContainer(
any(ContainerId.class), any(ContainerLaunchContext.class),
anyBoolean());
doNothing().when(client).restartContainer(any(ContainerId.class));
doNothing().when(client).rollbackLastReInitialization(
any(ContainerId.class));
doNothing().when(client).commitLastReInitialization(
any(ContainerId.class));
doNothing().when(client).stopContainer(any(ContainerId.class), doNothing().when(client).stopContainer(any(ContainerId.class),
any(NodeId.class)); any(NodeId.class));
break; break;
@ -485,9 +704,23 @@ public class TestNMClientAsync {
recordFactory.newRecordInstance(ContainerStatus.class)); recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Increase Resource Exception")) doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
.when(client).increaseContainerResource(any(Container.class)); .when(client).increaseContainerResource(any(Container.class));
doThrow(RPCUtil.getRemoteException("ReInitialize Exception"))
.when(client).reInitializeContainer(
any(ContainerId.class), any(ContainerLaunchContext.class),
anyBoolean());
doThrow(RPCUtil.getRemoteException("Restart Exception"))
.when(client).restartContainer(any(ContainerId.class));
doThrow(RPCUtil.getRemoteException("Rollback upgrade Exception"))
.when(client).rollbackLastReInitialization(
any(ContainerId.class));
doThrow(RPCUtil.getRemoteException("Commit upgrade Exception"))
.when(client).commitLastReInitialization(
any(ContainerId.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client) doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
.stopContainer(any(ContainerId.class), any(NodeId.class)); .stopContainer(any(ContainerId.class), any(NodeId.class));
} }
when(client.getNodeIdOfStartedContainer(any(ContainerId.class)))
.thenReturn(NodeId.newInstance("localhost", 0));
return client; return client;
} }

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -310,6 +312,36 @@ public class TestNMClient {
e.getMessage().contains("is not handled by this NodeManager")); e.getMessage().contains("is not handled by this NodeManager"));
} }
// restart shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.restartContainer(container.getId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// rollback shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.rollbackLastReInitialization(container.getId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// commit shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.commitLastReInitialization(container.getId());
fail("Exception is expected");
} catch (YarnException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("Unknown container"));
}
// stopContainer shouldn't be called before startContainer, // stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown // otherwise, an exception will be thrown
try { try {
@ -353,6 +385,28 @@ public class TestNMClient {
// Test increase container API and make sure requests can reach NM // Test increase container API and make sure requests can reach NM
testIncreaseContainerResource(container); testIncreaseContainerResource(container);
testRestartContainer(container.getId());
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Restarted", Arrays.asList(new Integer[] {-1000}));
if (i % 2 == 0) {
testReInitializeContainer(container.getId(), clc, false);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
testRollbackContainer(container.getId(), false);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Rolled-back", Arrays.asList(new Integer[] {-1000}));
testCommitContainer(container.getId(), true);
testReInitializeContainer(container.getId(), clc, false);
testCommitContainer(container.getId(), false);
} else {
testReInitializeContainer(container.getId(), clc, true);
testGetContainerStatus(container, i, ContainerState.RUNNING,
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
testRollbackContainer(container.getId(), true);
testCommitContainer(container.getId(), true);
}
try { try {
nmClient.stopContainer(container.getId(), container.getNodeId()); nmClient.stopContainer(container.getId(), container.getNodeId());
} catch (YarnException e) { } catch (YarnException e) {
@ -432,4 +486,91 @@ public class TestNMClient {
} }
} }
} }
private void testRestartContainer(ContainerId containerId)
throws YarnException, IOException {
try {
sleep(250);
nmClient.restartContainer(containerId);
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
private void testRollbackContainer(ContainerId containerId,
boolean notRollbackable) throws YarnException, IOException {
try {
sleep(250);
nmClient.rollbackLastReInitialization(containerId);
if (notRollbackable) {
fail("Should not be able to rollback..");
}
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (notRollbackable) {
Assert.assertTrue(e.getMessage().contains(
"Nothing to rollback to"));
} else {
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}
private void testCommitContainer(ContainerId containerId,
boolean notCommittable) throws YarnException, IOException {
try {
nmClient.commitLastReInitialization(containerId);
if (notCommittable) {
fail("Should not be able to commit..");
}
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (notCommittable) {
Assert.assertTrue(e.getMessage().contains(
"Nothing to Commit"));
} else {
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
}
private void testReInitializeContainer(ContainerId containerId,
ContainerLaunchContext clc, boolean autoCommit)
throws YarnException, IOException {
try {
sleep(250);
nmClient.reInitializeContainer(containerId, clc, autoCommit);
sleep(250);
} catch (YarnException e) {
// NM container will only be in SCHEDULED state, so expect the increase
// action to fail.
if (!e.getMessage().contains(
"can only be changed when a container is in RUNNING state")) {
throw (AssertionError)
(new AssertionError("Exception is not expected: " + e)
.initCause(e));
}
}
}
} }

View File

@ -22,8 +22,10 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -136,6 +138,8 @@ public class ContainerImpl implements Container {
} }
} }
private final SimpleDateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private final Lock readLock; private final Lock readLock;
private final Lock writeLock; private final Lock writeLock;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -763,7 +767,7 @@ public class ContainerImpl implements Container {
private void addDiagnostics(String... diags) { private void addDiagnostics(String... diags) {
for (String s : diags) { for (String s : diags) {
this.diagnostics.append(s); this.diagnostics.append("[" + dateFormat.format(new Date()) + "]" + s);
} }
if (diagnostics.length() > diagnosticsMaxSize) { if (diagnostics.length() > diagnosticsMaxSize) {
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize); diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
@ -987,6 +991,7 @@ public class ContainerImpl implements Container {
// We also need to make sure that if Rollback is possible, the // We also need to make sure that if Rollback is possible, the
// rollback state should be retained in the // rollback state should be retained in the
// oldLaunchContext and oldResourceSet // oldLaunchContext and oldResourceSet
container.addDiagnostics("Container will be Restarted.\n");
return new ReInitializationContext( return new ReInitializationContext(
container.launchContext, container.resourceSet, container.launchContext, container.resourceSet,
container.canRollback() ? container.canRollback() ?
@ -994,6 +999,7 @@ public class ContainerImpl implements Container {
container.canRollback() ? container.canRollback() ?
container.reInitContext.oldResourceSet : null); container.reInitContext.oldResourceSet : null);
} else { } else {
container.addDiagnostics("Container will be Re-initialized.\n");
return new ReInitializationContext( return new ReInitializationContext(
reInitEvent.getReInitLaunchContext(), reInitEvent.getReInitLaunchContext(),
reInitEvent.getResourceSet(), reInitEvent.getResourceSet(),
@ -1014,6 +1020,7 @@ public class ContainerImpl implements Container {
@Override @Override
protected ReInitializationContext createReInitContext(ContainerImpl protected ReInitializationContext createReInitContext(ContainerImpl
container, ContainerEvent event) { container, ContainerEvent event) {
container.addDiagnostics("Container upgrade will be Rolled-back.\n");
LOG.warn("Container [" + container.getContainerId() + "]" + LOG.warn("Container [" + container.getContainerId() + "]" +
" about to be explicitly Rolledback !!"); " about to be explicitly Rolledback !!");
return container.reInitContext.createContextForRollback(); return container.reInitContext.createContextForRollback();