YARN-6306. NMClient API change for container upgrade. Contributed by Arun Suresh
This commit is contained in:
parent
1c6ec991b5
commit
f8be02703a
|
@ -58,6 +58,10 @@ public abstract class NMClient extends AbstractService {
|
|||
return client;
|
||||
}
|
||||
|
||||
protected enum UpgradeOp {
|
||||
REINIT, RESTART, COMMIT, ROLLBACK
|
||||
}
|
||||
|
||||
private NMTokenCache nmTokenCache = NMTokenCache.getSingleton();
|
||||
|
||||
@Private
|
||||
|
@ -79,8 +83,8 @@ public abstract class NMClient extends AbstractService {
|
|||
* <code>NodeManager</code> to launch the
|
||||
* container
|
||||
* @return a map between the auxiliary service names and their outputs
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract Map<String, ByteBuffer> startContainer(Container container,
|
||||
ContainerLaunchContext containerLaunchContext)
|
||||
|
@ -95,9 +99,10 @@ public abstract class NMClient extends AbstractService {
|
|||
* {@link Container}.
|
||||
* </p>
|
||||
*
|
||||
* @param container the container with updated token
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @param container the container with updated token.
|
||||
*
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract void increaseContainerResource(Container container)
|
||||
throws YarnException, IOException;
|
||||
|
@ -108,8 +113,8 @@ public abstract class NMClient extends AbstractService {
|
|||
* @param containerId the Id of the started container
|
||||
* @param nodeId the Id of the <code>NodeManager</code>
|
||||
*
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract void stopContainer(ContainerId containerId, NodeId nodeId)
|
||||
throws YarnException, IOException;
|
||||
|
@ -120,13 +125,61 @@ public abstract class NMClient extends AbstractService {
|
|||
* @param containerId the Id of the started container
|
||||
* @param nodeId the Id of the <code>NodeManager</code>
|
||||
*
|
||||
* @return the status of a container
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
* @return the status of a container.
|
||||
*
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract ContainerStatus getContainerStatus(ContainerId containerId,
|
||||
NodeId nodeId) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Re-Initialize the Container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to Re-Initialize.
|
||||
* @param containerLaunchContex the updated ContainerLaunchContext.
|
||||
* @param autoCommit commit re-initialization automatically ?
|
||||
*
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract void reInitializeContainer(ContainerId containerId,
|
||||
ContainerLaunchContext containerLaunchContex, boolean autoCommit)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Restart the specified container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
*
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract void restartContainer(ContainerId containerId)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Rollback last reInitialization of the specified container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
*
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract void rollbackLastReInitialization(ContainerId containerId)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Commit last reInitialization of the specified container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to commit reInitialize.
|
||||
*
|
||||
* @throws YarnException YarnException.
|
||||
* @throws IOException IOException.
|
||||
*/
|
||||
public abstract void commitLastReInitialization(ContainerId containerId)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Set whether the containers that are started by this client, and are
|
||||
* still running should be stopped when the client stops. By default, the
|
||||
|
@ -165,4 +218,15 @@ public abstract class NMClient extends AbstractService {
|
|||
return nmTokenCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the NodeId of the node on which container is running. It returns
|
||||
* null if the container if container is not found or if it is not running.
|
||||
*
|
||||
* @param containerId Container Id of the container.
|
||||
* @return NodeId of the container on which it is running.
|
||||
*/
|
||||
public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.client.api.async;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
|
@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
|
||||
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
|
||||
|
@ -181,6 +179,38 @@ public abstract class NMClientAsync extends AbstractService {
|
|||
|
||||
public abstract void increaseContainerResourceAsync(Container container);
|
||||
|
||||
/**
|
||||
* <p>Re-Initialize the Container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to Re-Initialize.
|
||||
* @param containerLaunchContex the updated ContainerLaunchContext.
|
||||
* @param autoCommit commit re-initialization automatically ?
|
||||
*/
|
||||
public abstract void reInitializeContainerAsync(ContainerId containerId,
|
||||
ContainerLaunchContext containerLaunchContex, boolean autoCommit);
|
||||
|
||||
/**
|
||||
* <p>Restart the specified container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
*/
|
||||
public abstract void restartContainerAsync(ContainerId containerId);
|
||||
|
||||
/**
|
||||
* <p>Rollback last reInitialization of the specified container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
*/
|
||||
public abstract void rollbackLastReInitializationAsync(
|
||||
ContainerId containerId);
|
||||
|
||||
/**
|
||||
* <p>Commit last reInitialization of the specified container.</p>
|
||||
*
|
||||
* @param containerId the Id of the container to commit reInitialize.
|
||||
*/
|
||||
public abstract void commitLastReInitializationAsync(ContainerId containerId);
|
||||
|
||||
public abstract void stopContainerAsync(
|
||||
ContainerId containerId, NodeId nodeId);
|
||||
|
||||
|
@ -303,6 +333,70 @@ public abstract class NMClientAsync extends AbstractService {
|
|||
*/
|
||||
public abstract void onStopContainerError(
|
||||
ContainerId containerId, Throwable t);
|
||||
|
||||
/**
|
||||
* Callback for container re-initialization request.
|
||||
*
|
||||
* @param containerId the Id of the container to be Re-Initialized.
|
||||
*/
|
||||
public void onContainerReInitialize(ContainerId containerId) {}
|
||||
|
||||
/**
|
||||
* Callback for container restart.
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
*/
|
||||
public void onContainerRestart(ContainerId containerId) {}
|
||||
|
||||
/**
|
||||
* Callback for rollback of last re-initialization.
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
*/
|
||||
public void onRollbackLastReInitialization(ContainerId containerId) {}
|
||||
|
||||
/**
|
||||
* Callback for commit of last re-initialization.
|
||||
*
|
||||
* @param containerId the Id of the container to commit reInitialize.
|
||||
*/
|
||||
public void onCommitLastReInitialization(ContainerId containerId) {}
|
||||
|
||||
/**
|
||||
* Error Callback for container re-initialization request.
|
||||
*
|
||||
* @param containerId the Id of the container to be Re-Initialized.
|
||||
* @param t a Throwable.
|
||||
*/
|
||||
public void onContainerReInitializeError(ContainerId containerId,
|
||||
Throwable t) {}
|
||||
|
||||
/**
|
||||
* Error Callback for container restart.
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
* @param t a Throwable.
|
||||
*
|
||||
*/
|
||||
public void onContainerRestartError(ContainerId containerId, Throwable t) {}
|
||||
|
||||
/**
|
||||
* Error Callback for rollback of last re-initialization.
|
||||
*
|
||||
* @param containerId the Id of the container to restart.
|
||||
* @param t a Throwable.
|
||||
*/
|
||||
public void onRollbackLastReInitializationError(ContainerId containerId,
|
||||
Throwable t) {}
|
||||
|
||||
/**
|
||||
* Error Callback for commit of last re-initialization.
|
||||
*
|
||||
* @param containerId the Id of the container to commit reInitialize.
|
||||
* @param t a Throwable.
|
||||
*/
|
||||
public void onCommitLastReInitializationError(ContainerId containerId,
|
||||
Throwable t) {}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -282,6 +282,103 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reInitializeContainerAsync(ContainerId containerId,
|
||||
ContainerLaunchContext containerLaunchContex, boolean autoCommit){
|
||||
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
|
||||
LOG.error("Callback handler does not implement container re-initialize "
|
||||
+ "callback methods");
|
||||
return;
|
||||
}
|
||||
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
|
||||
if (containers.get(containerId) == null) {
|
||||
handler.onContainerReInitializeError(
|
||||
containerId, RPCUtil.getRemoteException(
|
||||
"Container " + containerId + " is not started"));
|
||||
}
|
||||
try {
|
||||
events.put(new ReInitializeContainerEvevnt(containerId,
|
||||
client.getNodeIdOfStartedContainer(containerId),
|
||||
containerLaunchContex, autoCommit));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event of re-initializing of "
|
||||
+ "Container " + containerId);
|
||||
handler.onContainerReInitializeError(containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restartContainerAsync(ContainerId containerId){
|
||||
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
|
||||
LOG.error("Callback handler does not implement container restart "
|
||||
+ "callback methods");
|
||||
return;
|
||||
}
|
||||
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
|
||||
if (containers.get(containerId) == null) {
|
||||
handler.onContainerRestartError(
|
||||
containerId, RPCUtil.getRemoteException(
|
||||
"Container " + containerId + " is not started"));
|
||||
}
|
||||
try {
|
||||
events.put(new ContainerEvent(containerId,
|
||||
client.getNodeIdOfStartedContainer(containerId),
|
||||
null, ContainerEventType.RESTART_CONTAINER));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event of restart of "
|
||||
+ "Container " + containerId);
|
||||
handler.onContainerRestartError(containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackLastReInitializationAsync(ContainerId containerId){
|
||||
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
|
||||
LOG.error("Callback handler does not implement container rollback "
|
||||
+ "callback methods");
|
||||
return;
|
||||
}
|
||||
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
|
||||
if (containers.get(containerId) == null) {
|
||||
handler.onRollbackLastReInitializationError(
|
||||
containerId, RPCUtil.getRemoteException(
|
||||
"Container " + containerId + " is not started"));
|
||||
}
|
||||
try {
|
||||
events.put(new ContainerEvent(containerId,
|
||||
client.getNodeIdOfStartedContainer(containerId),
|
||||
null, ContainerEventType.ROLLBACK_LAST_REINIT));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event Rollback re-initialization"
|
||||
+ " of Container " + containerId);
|
||||
handler.onRollbackLastReInitializationError(containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitLastReInitializationAsync(ContainerId containerId){
|
||||
if (!(callbackHandler instanceof AbstractCallbackHandler)) {
|
||||
LOG.error("Callback handler does not implement container commit last " +
|
||||
"re-initialization callback methods");
|
||||
return;
|
||||
}
|
||||
AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
|
||||
if (containers.get(containerId) == null) {
|
||||
handler.onCommitLastReInitializationError(
|
||||
containerId, RPCUtil.getRemoteException(
|
||||
"Container " + containerId + " is not started"));
|
||||
}
|
||||
try {
|
||||
events.put(new ContainerEvent(containerId,
|
||||
client.getNodeIdOfStartedContainer(containerId),
|
||||
null, ContainerEventType.COMMIT_LAST_REINT));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event Commit re-initialization"
|
||||
+ " of Container " + containerId);
|
||||
handler.onCommitLastReInitializationError(containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
|
||||
if (containers.get(containerId) == null) {
|
||||
callbackHandler.onStopContainerError(containerId,
|
||||
|
@ -330,7 +427,11 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|||
START_CONTAINER,
|
||||
STOP_CONTAINER,
|
||||
QUERY_CONTAINER,
|
||||
INCREASE_CONTAINER_RESOURCE
|
||||
INCREASE_CONTAINER_RESOURCE,
|
||||
REINITIALIZE_CONTAINER,
|
||||
RESTART_CONTAINER,
|
||||
ROLLBACK_LAST_REINIT,
|
||||
COMMIT_LAST_REINT
|
||||
}
|
||||
|
||||
protected static class ContainerEvent
|
||||
|
@ -381,6 +482,27 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|||
}
|
||||
}
|
||||
|
||||
protected static class ReInitializeContainerEvevnt extends ContainerEvent {
|
||||
private ContainerLaunchContext containerLaunchContext;
|
||||
private boolean autoCommit;
|
||||
|
||||
public ReInitializeContainerEvevnt(ContainerId containerId, NodeId nodeId,
|
||||
ContainerLaunchContext containerLaunchContext, boolean autoCommit) {
|
||||
super(containerId, nodeId, null,
|
||||
ContainerEventType.REINITIALIZE_CONTAINER);
|
||||
this.containerLaunchContext = containerLaunchContext;
|
||||
this.autoCommit = autoCommit;
|
||||
}
|
||||
|
||||
public ContainerLaunchContext getContainerLaunchContext() {
|
||||
return containerLaunchContext;
|
||||
}
|
||||
|
||||
public boolean isAutoCommit() {
|
||||
return autoCommit;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class IncreaseContainerResourceEvent extends ContainerEvent {
|
||||
private Container container;
|
||||
|
||||
|
@ -416,6 +538,25 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|||
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
||||
ContainerEventType.INCREASE_CONTAINER_RESOURCE,
|
||||
new IncreaseContainerResourceTransition())
|
||||
|
||||
// Transitions for Container Upgrade
|
||||
.addTransition(ContainerState.RUNNING,
|
||||
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
|
||||
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||
new ReInitializeContainerTransition())
|
||||
.addTransition(ContainerState.RUNNING,
|
||||
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
|
||||
ContainerEventType.RESTART_CONTAINER,
|
||||
new ReInitializeContainerTransition())
|
||||
.addTransition(ContainerState.RUNNING,
|
||||
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
|
||||
ContainerEventType.ROLLBACK_LAST_REINIT,
|
||||
new ReInitializeContainerTransition())
|
||||
.addTransition(ContainerState.RUNNING,
|
||||
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
|
||||
ContainerEventType.COMMIT_LAST_REINT,
|
||||
new ReInitializeContainerTransition())
|
||||
|
||||
.addTransition(ContainerState.RUNNING,
|
||||
EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
|
||||
ContainerEventType.STOP_CONTAINER,
|
||||
|
@ -431,6 +572,10 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|||
.addTransition(ContainerState.FAILED, ContainerState.FAILED,
|
||||
EnumSet.of(ContainerEventType.START_CONTAINER,
|
||||
ContainerEventType.STOP_CONTAINER,
|
||||
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||
ContainerEventType.RESTART_CONTAINER,
|
||||
ContainerEventType.COMMIT_LAST_REINT,
|
||||
ContainerEventType.ROLLBACK_LAST_REINIT,
|
||||
ContainerEventType.INCREASE_CONTAINER_RESOURCE));
|
||||
|
||||
protected static class StartContainerTransition implements
|
||||
|
@ -529,6 +674,119 @@ public class NMClientAsyncImpl extends NMClientAsync {
|
|||
}
|
||||
}
|
||||
|
||||
protected static class ReInitializeContainerTransition implements
|
||||
MultipleArcTransition<StatefulContainer, ContainerEvent,
|
||||
ContainerState> {
|
||||
|
||||
@Override
|
||||
public ContainerState transition(StatefulContainer container,
|
||||
ContainerEvent containerEvent) {
|
||||
ContainerId containerId = containerEvent.getContainerId();
|
||||
AbstractCallbackHandler handler = (AbstractCallbackHandler) container
|
||||
.nmClientAsync.getCallbackHandler();
|
||||
Throwable handlerError = null;
|
||||
try {
|
||||
switch(containerEvent.getType()) {
|
||||
case REINITIALIZE_CONTAINER:
|
||||
if (!(containerEvent instanceof ReInitializeContainerEvevnt)) {
|
||||
LOG.error("Unexpected Event.. [" +containerEvent.getType() + "]");
|
||||
return ContainerState.FAILED;
|
||||
}
|
||||
ReInitializeContainerEvevnt rEvent =
|
||||
(ReInitializeContainerEvevnt)containerEvent;
|
||||
container.nmClientAsync.getClient().reInitializeContainer(
|
||||
containerId, rEvent.getContainerLaunchContext(),
|
||||
rEvent.isAutoCommit());
|
||||
try {
|
||||
handler.onContainerReInitialize(containerId);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
case RESTART_CONTAINER:
|
||||
container.nmClientAsync.getClient().restartContainer(containerId);
|
||||
try {
|
||||
handler.onContainerRestart(containerId);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
case ROLLBACK_LAST_REINIT:
|
||||
container.nmClientAsync.getClient()
|
||||
.rollbackLastReInitialization(containerId);
|
||||
try {
|
||||
handler.onRollbackLastReInitialization(containerId);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
case COMMIT_LAST_REINT:
|
||||
container.nmClientAsync.getClient()
|
||||
.commitLastReInitialization(containerId);
|
||||
try {
|
||||
handler.onCommitLastReInitialization(containerId);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
|
||||
" expected here..");
|
||||
break;
|
||||
}
|
||||
if (handlerError != null) {
|
||||
LOG.info("Unchecked exception is thrown in handler for event ["
|
||||
+ containerEvent.getType() + "] for Container "
|
||||
+ containerId, handlerError);
|
||||
}
|
||||
|
||||
return ContainerState.RUNNING;
|
||||
} catch (Throwable t) {
|
||||
switch(containerEvent.getType()) {
|
||||
case REINITIALIZE_CONTAINER:
|
||||
try {
|
||||
handler.onContainerReInitializeError(containerId, t);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
case RESTART_CONTAINER:
|
||||
try {
|
||||
handler.onContainerRestartError(containerId, t);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
case ROLLBACK_LAST_REINIT:
|
||||
try {
|
||||
handler.onRollbackLastReInitializationError(containerId, t);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
case COMMIT_LAST_REINT:
|
||||
try {
|
||||
handler.onCommitLastReInitializationError(containerId, t);
|
||||
} catch (Throwable tr) {
|
||||
handlerError = tr;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
|
||||
" expected here..");
|
||||
break;
|
||||
}
|
||||
if (handlerError != null) {
|
||||
LOG.info("Unchecked exception is thrown in handler for event ["
|
||||
+ containerEvent.getType() + "] for Container "
|
||||
+ containerId, handlerError);
|
||||
}
|
||||
}
|
||||
|
||||
return ContainerState.FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class StopContainerTransition implements
|
||||
MultipleArcTransition<StatefulContainer, ContainerEvent,
|
||||
ContainerState> {
|
||||
|
|
|
@ -33,10 +33,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||
|
@ -306,6 +309,84 @@ public class NMClientImpl extends NMClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reInitializeContainer(ContainerId containerId,
|
||||
ContainerLaunchContext containerLaunchContex, boolean autoCommit)
|
||||
throws YarnException, IOException {
|
||||
ContainerManagementProtocolProxyData proxy = null;
|
||||
StartedContainer container = startedContainers.get(containerId);
|
||||
if (container != null) {
|
||||
synchronized (container) {
|
||||
proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
|
||||
try {
|
||||
proxy.getContainerManagementProtocol().reInitializeContainer(
|
||||
ReInitializeContainerRequest.newInstance(
|
||||
containerId, containerLaunchContex, autoCommit));
|
||||
} finally {
|
||||
if (proxy != null) {
|
||||
cmProxy.mayBeCloseProxy(proxy);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new YarnException("Unknown container [" + containerId + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restartContainer(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
restartCommitOrRollbackContainer(containerId, UpgradeOp.RESTART);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackLastReInitialization(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
restartCommitOrRollbackContainer(containerId, UpgradeOp.ROLLBACK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitLastReInitialization(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
restartCommitOrRollbackContainer(containerId, UpgradeOp.COMMIT);
|
||||
}
|
||||
|
||||
|
||||
private void restartCommitOrRollbackContainer(ContainerId containerId,
|
||||
UpgradeOp upgradeOp) throws YarnException, IOException {
|
||||
ContainerManagementProtocolProxyData proxy = null;
|
||||
StartedContainer container = startedContainers.get(containerId);
|
||||
if (container != null) {
|
||||
synchronized (container) {
|
||||
proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
|
||||
ContainerManagementProtocol cmp =
|
||||
proxy.getContainerManagementProtocol();
|
||||
try {
|
||||
switch (upgradeOp) {
|
||||
case RESTART:
|
||||
cmp.restartContainer(containerId);
|
||||
break;
|
||||
case COMMIT:
|
||||
cmp.commitLastReInitialization(containerId);
|
||||
break;
|
||||
case ROLLBACK:
|
||||
cmp.rollbackLastReInitialization(containerId);
|
||||
break;
|
||||
default:
|
||||
// Should not happen..
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
if (proxy != null) {
|
||||
cmProxy.mayBeCloseProxy(proxy);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new YarnException("Unknown container [" + containerId + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
|
||||
throws IOException, YarnException {
|
||||
ContainerManagementProtocolProxyData proxy = null;
|
||||
|
@ -343,4 +424,14 @@ public class NMClientImpl extends NMClient {
|
|||
throw (IOException) t;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
|
||||
StartedContainer container = startedContainers.get(containerId);
|
||||
if (container != null) {
|
||||
return container.getNodeId();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.client.api.async.impl;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -27,6 +28,8 @@ import static org.mockito.Mockito.when;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -69,6 +73,22 @@ public class TestNMClientAsync {
|
|||
private NodeId nodeId;
|
||||
private Token containerToken;
|
||||
|
||||
enum OpsToTest {
|
||||
START, QUERY, STOP, INCR, REINIT, RESTART, ROLLBACK, COMMIT
|
||||
}
|
||||
|
||||
final static class TestData {
|
||||
AtomicInteger success = new AtomicInteger(0);
|
||||
AtomicInteger failure = new AtomicInteger(0);
|
||||
final AtomicIntegerArray successArray;
|
||||
final AtomicIntegerArray failureArray;
|
||||
|
||||
private TestData(int expectedSuccess, int expectedFailure) {
|
||||
this.successArray = new AtomicIntegerArray(expectedSuccess);
|
||||
this.failureArray = new AtomicIntegerArray(expectedFailure);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
ServiceOperations.stop(asyncClient);
|
||||
|
@ -194,25 +214,7 @@ public class TestNMClientAsync {
|
|||
private int expectedSuccess;
|
||||
private int expectedFailure;
|
||||
|
||||
private AtomicInteger actualStartSuccess = new AtomicInteger(0);
|
||||
private AtomicInteger actualStartFailure = new AtomicInteger(0);
|
||||
private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
|
||||
private AtomicInteger actualQueryFailure = new AtomicInteger(0);
|
||||
private AtomicInteger actualStopSuccess = new AtomicInteger(0);
|
||||
private AtomicInteger actualStopFailure = new AtomicInteger(0);
|
||||
private AtomicInteger actualIncreaseResourceSuccess =
|
||||
new AtomicInteger(0);
|
||||
private AtomicInteger actualIncreaseResourceFailure =
|
||||
new AtomicInteger(0);
|
||||
|
||||
private AtomicIntegerArray actualStartSuccessArray;
|
||||
private AtomicIntegerArray actualStartFailureArray;
|
||||
private AtomicIntegerArray actualQuerySuccessArray;
|
||||
private AtomicIntegerArray actualQueryFailureArray;
|
||||
private AtomicIntegerArray actualStopSuccessArray;
|
||||
private AtomicIntegerArray actualStopFailureArray;
|
||||
private AtomicIntegerArray actualIncreaseResourceSuccessArray;
|
||||
private AtomicIntegerArray actualIncreaseResourceFailureArray;
|
||||
private final Map<OpsToTest, TestData> testMap = new HashMap<>();
|
||||
|
||||
private Set<String> errorMsgs =
|
||||
Collections.synchronizedSet(new HashSet<String>());
|
||||
|
@ -221,16 +223,9 @@ public class TestNMClientAsync {
|
|||
this.expectedSuccess = expectedSuccess;
|
||||
this.expectedFailure = expectedFailure;
|
||||
|
||||
actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
|
||||
actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
|
||||
actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
|
||||
actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
|
||||
actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
|
||||
actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
|
||||
actualIncreaseResourceSuccessArray =
|
||||
new AtomicIntegerArray(expectedSuccess);
|
||||
actualIncreaseResourceFailureArray =
|
||||
new AtomicIntegerArray(expectedFailure);
|
||||
for (OpsToTest op : OpsToTest.values()) {
|
||||
testMap.put(op, new TestData(expectedSuccess, expectedFailure));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
|
@ -243,8 +238,9 @@ public class TestNMClientAsync {
|
|||
" should throw the exception onContainerStarted");
|
||||
return;
|
||||
}
|
||||
actualStartSuccess.addAndGet(1);
|
||||
actualStartSuccessArray.set(containerId.getId(), 1);
|
||||
TestData td = testMap.get(OpsToTest.START);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
|
||||
// move on to the following success tests
|
||||
asyncClient.getContainerStatusAsync(containerId, nodeId);
|
||||
|
@ -254,7 +250,28 @@ public class TestNMClientAsync {
|
|||
// containerId
|
||||
Container container = Container.newInstance(
|
||||
containerId, nodeId, null, null, null, containerToken);
|
||||
int t = containerId.getId() % 5;
|
||||
switch (t) {
|
||||
case 0:
|
||||
asyncClient.increaseContainerResourceAsync(container);
|
||||
break;
|
||||
case 1:
|
||||
asyncClient.reInitializeContainerAsync(containerId,
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class),
|
||||
true);
|
||||
break;
|
||||
case 2:
|
||||
asyncClient.restartContainerAsync(containerId);
|
||||
break;
|
||||
case 3:
|
||||
asyncClient.rollbackLastReInitializationAsync(containerId);
|
||||
break;
|
||||
case 4:
|
||||
asyncClient.commitLastReInitializationAsync(containerId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
|
@ -270,8 +287,9 @@ public class TestNMClientAsync {
|
|||
" should throw the exception onContainerStatusReceived");
|
||||
return;
|
||||
}
|
||||
actualQuerySuccess.addAndGet(1);
|
||||
actualQuerySuccessArray.set(containerId.getId(), 1);
|
||||
TestData td = testMap.get(OpsToTest.QUERY);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
// make sure we pass in the container with the same
|
||||
// containerId
|
||||
|
@ -292,8 +310,78 @@ public class TestNMClientAsync {
|
|||
" should throw the exception onContainerResourceIncreased");
|
||||
return;
|
||||
}
|
||||
actualIncreaseResourceSuccess.addAndGet(1);
|
||||
actualIncreaseResourceSuccessArray.set(containerId.getId(), 1);
|
||||
TestData td = testMap.get(OpsToTest.INCR);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
asyncClient.reInitializeContainerAsync(containerId,
|
||||
Records.newRecord(ContainerLaunchContext.class), true);
|
||||
// throw a fake user exception, and shouldn't crash the test
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onContainerReInitialize(ContainerId containerId) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerReInitialize");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.REINIT);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
asyncClient.restartContainerAsync(containerId);
|
||||
// throw a fake user exception, and shouldn't crash the test
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onContainerRestart(ContainerId containerId) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerReInitialize");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.RESTART);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
asyncClient.rollbackLastReInitializationAsync(containerId);
|
||||
// throw a fake user exception, and shouldn't crash the test
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onRollbackLastReInitialization(ContainerId containerId) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerReInitialize");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.ROLLBACK);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
asyncClient.commitLastReInitializationAsync(containerId);
|
||||
// throw a fake user exception, and shouldn't crash the test
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onCommitLastReInitialization(ContainerId containerId) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerReInitialize");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.COMMIT);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
asyncClient.stopContainerAsync(containerId, nodeId);
|
||||
// throw a fake user exception, and shouldn't crash the test
|
||||
|
@ -308,8 +396,9 @@ public class TestNMClientAsync {
|
|||
" should throw the exception onContainerStopped");
|
||||
return;
|
||||
}
|
||||
actualStopSuccess.addAndGet(1);
|
||||
actualStopSuccessArray.set(containerId.getId(), 1);
|
||||
TestData td = testMap.get(OpsToTest.STOP);
|
||||
td.success.addAndGet(1);
|
||||
td.successArray.set(containerId.getId(), 1);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
|
@ -330,8 +419,9 @@ public class TestNMClientAsync {
|
|||
" shouldn't throw the exception onStartContainerError");
|
||||
return;
|
||||
}
|
||||
actualStartFailure.addAndGet(1);
|
||||
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
|
||||
TestData td = testMap.get(OpsToTest.START);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(containerId.getId() - expectedSuccess, 1);
|
||||
// move on to the following failure tests
|
||||
asyncClient.getContainerStatusAsync(containerId, nodeId);
|
||||
|
||||
|
@ -348,8 +438,9 @@ public class TestNMClientAsync {
|
|||
" shouldn't throw the exception onIncreaseContainerResourceError");
|
||||
return;
|
||||
}
|
||||
actualIncreaseResourceFailure.addAndGet(1);
|
||||
actualIncreaseResourceFailureArray.set(
|
||||
TestData td = testMap.get(OpsToTest.INCR);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
// increase container resource error should NOT change the
|
||||
// the container status to FAILED
|
||||
|
@ -359,6 +450,102 @@ public class TestNMClientAsync {
|
|||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onContainerReInitializeError(ContainerId containerId,
|
||||
Throwable t) {
|
||||
if (containerId.getId() < expectedSuccess + expectedFailure) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception onContainerReInitializeError");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.REINIT);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
|
||||
// increment the stop counters here.. since the container will fail
|
||||
td = testMap.get(OpsToTest.STOP);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
// reInit container changes the container status to FAILED
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onContainerRestartError(ContainerId containerId, Throwable t) {
|
||||
if (containerId.getId() < expectedSuccess + expectedFailure) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception onContainerRestartError");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.RESTART);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
|
||||
// increment the stop counters here.. since the container will fail
|
||||
td = testMap.get(OpsToTest.STOP);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
// restart container changes the container status to FAILED
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onRollbackLastReInitializationError(ContainerId containerId,
|
||||
Throwable t) {
|
||||
if (containerId.getId() < expectedSuccess + expectedFailure) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception" +
|
||||
" onRollbackLastReInitializationError");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.ROLLBACK);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
|
||||
// increment the stop counters here.. since the container will fail
|
||||
td = testMap.get(OpsToTest.STOP);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
// rollback container changes the container status to FAILED
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onCommitLastReInitializationError(ContainerId containerId,
|
||||
Throwable t) {
|
||||
if (containerId.getId() < expectedSuccess + expectedFailure) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception onCommitLastReInitializationError");
|
||||
return;
|
||||
}
|
||||
TestData td = testMap.get(OpsToTest.COMMIT);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
|
||||
// increment the stop counters here.. since the container will fail
|
||||
td = testMap.get(OpsToTest.STOP);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
// commit container changes the container status to FAILED
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void onStopContainerError(ContainerId containerId, Throwable t) {
|
||||
|
@ -371,9 +558,9 @@ public class TestNMClientAsync {
|
|||
" shouldn't throw the exception onStopContainerError");
|
||||
return;
|
||||
}
|
||||
|
||||
actualStopFailure.addAndGet(1);
|
||||
actualStopFailureArray.set(
|
||||
TestData td = testMap.get(OpsToTest.STOP);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
|
@ -393,8 +580,9 @@ public class TestNMClientAsync {
|
|||
" shouldn't throw the exception onGetContainerStatusError");
|
||||
return;
|
||||
}
|
||||
actualQueryFailure.addAndGet(1);
|
||||
actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
|
||||
TestData td = testMap.get(OpsToTest.QUERY);
|
||||
td.failure.addAndGet(1);
|
||||
td.failureArray.set(containerId.getId() - expectedSuccess, 1);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
|
@ -402,44 +590,67 @@ public class TestNMClientAsync {
|
|||
|
||||
public boolean isAllSuccessCallsExecuted() {
|
||||
boolean isAllSuccessCallsExecuted =
|
||||
actualStartSuccess.get() == expectedSuccess &&
|
||||
actualQuerySuccess.get() == expectedSuccess &&
|
||||
actualIncreaseResourceSuccess.get() == expectedSuccess &&
|
||||
actualStopSuccess.get() == expectedSuccess;
|
||||
testMap.get(OpsToTest.START).success.get() == expectedSuccess &&
|
||||
testMap.get(OpsToTest.QUERY).success.get() == expectedSuccess &&
|
||||
testMap.get(OpsToTest.INCR).success.get() == expectedSuccess &&
|
||||
testMap.get(OpsToTest.REINIT).success.get() == expectedSuccess &&
|
||||
testMap.get(OpsToTest.RESTART).success.get() == expectedSuccess &&
|
||||
testMap.get(OpsToTest.ROLLBACK).success.get() ==
|
||||
expectedSuccess &&
|
||||
testMap.get(OpsToTest.COMMIT).success.get() == expectedSuccess &&
|
||||
testMap.get(OpsToTest.STOP).success.get() == expectedSuccess;
|
||||
if (isAllSuccessCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualStartSuccessArray);
|
||||
assertAtomicIntegerArray(actualQuerySuccessArray);
|
||||
assertAtomicIntegerArray(actualIncreaseResourceSuccessArray);
|
||||
assertAtomicIntegerArray(actualStopSuccessArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.START).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.INCR).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.REINIT).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.RESTART).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.ROLLBACK).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.COMMIT).successArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).successArray);
|
||||
}
|
||||
return isAllSuccessCallsExecuted;
|
||||
}
|
||||
|
||||
public boolean isStartAndQueryFailureCallsExecuted() {
|
||||
boolean isStartAndQueryFailureCallsExecuted =
|
||||
actualStartFailure.get() == expectedFailure &&
|
||||
actualQueryFailure.get() == expectedFailure;
|
||||
testMap.get(OpsToTest.START).failure.get() == expectedFailure &&
|
||||
testMap.get(OpsToTest.QUERY).failure.get() == expectedFailure;
|
||||
if (isStartAndQueryFailureCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualStartFailureArray);
|
||||
assertAtomicIntegerArray(actualQueryFailureArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.START).failureArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).failureArray);
|
||||
}
|
||||
return isStartAndQueryFailureCallsExecuted;
|
||||
}
|
||||
|
||||
public boolean isIncreaseResourceFailureCallsExecuted() {
|
||||
boolean isIncreaseResourceFailureCallsExecuted =
|
||||
actualIncreaseResourceFailure.get() == expectedFailure;
|
||||
testMap.get(OpsToTest.INCR).failure.get()
|
||||
+ testMap.get(OpsToTest.REINIT).failure.get()
|
||||
+ testMap.get(OpsToTest.RESTART).failure.get()
|
||||
+ testMap.get(OpsToTest.ROLLBACK).failure.get()
|
||||
+ testMap.get(OpsToTest.COMMIT).failure.get()
|
||||
== expectedFailure;
|
||||
if (isIncreaseResourceFailureCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualIncreaseResourceFailureArray);
|
||||
AtomicIntegerArray testArray =
|
||||
new AtomicIntegerArray(
|
||||
testMap.get(OpsToTest.INCR).failureArray.length());
|
||||
for (int i = 0; i < testArray.length(); i++) {
|
||||
for (OpsToTest op : EnumSet.of(OpsToTest.REINIT, OpsToTest.RESTART,
|
||||
OpsToTest.ROLLBACK, OpsToTest.COMMIT, OpsToTest.INCR)) {
|
||||
testArray.addAndGet(i, testMap.get(op).failureArray.get(i));
|
||||
}
|
||||
}
|
||||
assertAtomicIntegerArray(testArray);
|
||||
}
|
||||
return isIncreaseResourceFailureCallsExecuted;
|
||||
}
|
||||
|
||||
public boolean isStopFailureCallsExecuted() {
|
||||
boolean isStopFailureCallsExecuted =
|
||||
actualStopFailure.get() == expectedFailure;
|
||||
testMap.get(OpsToTest.STOP).failure.get() == expectedFailure;
|
||||
if (isStopFailureCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualStopFailureArray);
|
||||
assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).failureArray);
|
||||
}
|
||||
return isStopFailureCallsExecuted;
|
||||
}
|
||||
|
@ -464,6 +675,14 @@ public class TestNMClientAsync {
|
|||
recordFactory.newRecordInstance(ContainerStatus.class));
|
||||
doNothing().when(client).increaseContainerResource(
|
||||
any(Container.class));
|
||||
doNothing().when(client).reInitializeContainer(
|
||||
any(ContainerId.class), any(ContainerLaunchContext.class),
|
||||
anyBoolean());
|
||||
doNothing().when(client).restartContainer(any(ContainerId.class));
|
||||
doNothing().when(client).rollbackLastReInitialization(
|
||||
any(ContainerId.class));
|
||||
doNothing().when(client).commitLastReInitialization(
|
||||
any(ContainerId.class));
|
||||
doNothing().when(client).stopContainer(any(ContainerId.class),
|
||||
any(NodeId.class));
|
||||
break;
|
||||
|
@ -485,9 +704,23 @@ public class TestNMClientAsync {
|
|||
recordFactory.newRecordInstance(ContainerStatus.class));
|
||||
doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
|
||||
.when(client).increaseContainerResource(any(Container.class));
|
||||
doThrow(RPCUtil.getRemoteException("ReInitialize Exception"))
|
||||
.when(client).reInitializeContainer(
|
||||
any(ContainerId.class), any(ContainerLaunchContext.class),
|
||||
anyBoolean());
|
||||
doThrow(RPCUtil.getRemoteException("Restart Exception"))
|
||||
.when(client).restartContainer(any(ContainerId.class));
|
||||
doThrow(RPCUtil.getRemoteException("Rollback upgrade Exception"))
|
||||
.when(client).rollbackLastReInitialization(
|
||||
any(ContainerId.class));
|
||||
doThrow(RPCUtil.getRemoteException("Commit upgrade Exception"))
|
||||
.when(client).commitLastReInitialization(
|
||||
any(ContainerId.class));
|
||||
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
|
||||
.stopContainer(any(ContainerId.class), any(NodeId.class));
|
||||
}
|
||||
when(client.getNodeIdOfStartedContainer(any(ContainerId.class)))
|
||||
.thenReturn(NodeId.newInstance("localhost", 0));
|
||||
return client;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
|
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -310,6 +312,36 @@ public class TestNMClient {
|
|||
e.getMessage().contains("is not handled by this NodeManager"));
|
||||
}
|
||||
|
||||
// restart shouldn't be called before startContainer,
|
||||
// otherwise, NodeManager cannot find the container
|
||||
try {
|
||||
nmClient.restartContainer(container.getId());
|
||||
fail("Exception is expected");
|
||||
} catch (YarnException e) {
|
||||
assertTrue("The thrown exception is not expected",
|
||||
e.getMessage().contains("Unknown container"));
|
||||
}
|
||||
|
||||
// rollback shouldn't be called before startContainer,
|
||||
// otherwise, NodeManager cannot find the container
|
||||
try {
|
||||
nmClient.rollbackLastReInitialization(container.getId());
|
||||
fail("Exception is expected");
|
||||
} catch (YarnException e) {
|
||||
assertTrue("The thrown exception is not expected",
|
||||
e.getMessage().contains("Unknown container"));
|
||||
}
|
||||
|
||||
// commit shouldn't be called before startContainer,
|
||||
// otherwise, NodeManager cannot find the container
|
||||
try {
|
||||
nmClient.commitLastReInitialization(container.getId());
|
||||
fail("Exception is expected");
|
||||
} catch (YarnException e) {
|
||||
assertTrue("The thrown exception is not expected",
|
||||
e.getMessage().contains("Unknown container"));
|
||||
}
|
||||
|
||||
// stopContainer shouldn't be called before startContainer,
|
||||
// otherwise, an exception will be thrown
|
||||
try {
|
||||
|
@ -353,6 +385,28 @@ public class TestNMClient {
|
|||
// Test increase container API and make sure requests can reach NM
|
||||
testIncreaseContainerResource(container);
|
||||
|
||||
testRestartContainer(container.getId());
|
||||
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
||||
"will be Restarted", Arrays.asList(new Integer[] {-1000}));
|
||||
|
||||
if (i % 2 == 0) {
|
||||
testReInitializeContainer(container.getId(), clc, false);
|
||||
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
||||
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
|
||||
testRollbackContainer(container.getId(), false);
|
||||
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
||||
"will be Rolled-back", Arrays.asList(new Integer[] {-1000}));
|
||||
testCommitContainer(container.getId(), true);
|
||||
testReInitializeContainer(container.getId(), clc, false);
|
||||
testCommitContainer(container.getId(), false);
|
||||
} else {
|
||||
testReInitializeContainer(container.getId(), clc, true);
|
||||
testGetContainerStatus(container, i, ContainerState.RUNNING,
|
||||
"will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
|
||||
testRollbackContainer(container.getId(), true);
|
||||
testCommitContainer(container.getId(), true);
|
||||
}
|
||||
|
||||
try {
|
||||
nmClient.stopContainer(container.getId(), container.getNodeId());
|
||||
} catch (YarnException e) {
|
||||
|
@ -432,4 +486,91 @@ public class TestNMClient {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testRestartContainer(ContainerId containerId)
|
||||
throws YarnException, IOException {
|
||||
try {
|
||||
sleep(250);
|
||||
nmClient.restartContainer(containerId);
|
||||
sleep(250);
|
||||
} catch (YarnException e) {
|
||||
// NM container will only be in SCHEDULED state, so expect the increase
|
||||
// action to fail.
|
||||
if (!e.getMessage().contains(
|
||||
"can only be changed when a container is in RUNNING state")) {
|
||||
throw (AssertionError)
|
||||
(new AssertionError("Exception is not expected: " + e)
|
||||
.initCause(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testRollbackContainer(ContainerId containerId,
|
||||
boolean notRollbackable) throws YarnException, IOException {
|
||||
try {
|
||||
sleep(250);
|
||||
nmClient.rollbackLastReInitialization(containerId);
|
||||
if (notRollbackable) {
|
||||
fail("Should not be able to rollback..");
|
||||
}
|
||||
sleep(250);
|
||||
} catch (YarnException e) {
|
||||
// NM container will only be in SCHEDULED state, so expect the increase
|
||||
// action to fail.
|
||||
if (notRollbackable) {
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Nothing to rollback to"));
|
||||
} else {
|
||||
if (!e.getMessage().contains(
|
||||
"can only be changed when a container is in RUNNING state")) {
|
||||
throw (AssertionError)
|
||||
(new AssertionError("Exception is not expected: " + e)
|
||||
.initCause(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testCommitContainer(ContainerId containerId,
|
||||
boolean notCommittable) throws YarnException, IOException {
|
||||
try {
|
||||
nmClient.commitLastReInitialization(containerId);
|
||||
if (notCommittable) {
|
||||
fail("Should not be able to commit..");
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
// NM container will only be in SCHEDULED state, so expect the increase
|
||||
// action to fail.
|
||||
if (notCommittable) {
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"Nothing to Commit"));
|
||||
} else {
|
||||
if (!e.getMessage().contains(
|
||||
"can only be changed when a container is in RUNNING state")) {
|
||||
throw (AssertionError)
|
||||
(new AssertionError("Exception is not expected: " + e)
|
||||
.initCause(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void testReInitializeContainer(ContainerId containerId,
|
||||
ContainerLaunchContext clc, boolean autoCommit)
|
||||
throws YarnException, IOException {
|
||||
try {
|
||||
sleep(250);
|
||||
nmClient.reInitializeContainer(containerId, clc, autoCommit);
|
||||
sleep(250);
|
||||
} catch (YarnException e) {
|
||||
// NM container will only be in SCHEDULED state, so expect the increase
|
||||
// action to fail.
|
||||
if (!e.getMessage().contains(
|
||||
"can only be changed when a container is in RUNNING state")) {
|
||||
throw (AssertionError)
|
||||
(new AssertionError("Exception is not expected: " + e)
|
||||
.initCause(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,8 +22,10 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -137,6 +139,8 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
private final SimpleDateFormat dateFormat =
|
||||
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
||||
private final Lock readLock;
|
||||
private final Lock writeLock;
|
||||
private final Dispatcher dispatcher;
|
||||
|
@ -767,7 +771,7 @@ public class ContainerImpl implements Container {
|
|||
|
||||
private void addDiagnostics(String... diags) {
|
||||
for (String s : diags) {
|
||||
this.diagnostics.append(s);
|
||||
this.diagnostics.append("[" + dateFormat.format(new Date()) + "]" + s);
|
||||
}
|
||||
if (diagnostics.length() > diagnosticsMaxSize) {
|
||||
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
|
||||
|
@ -991,6 +995,7 @@ public class ContainerImpl implements Container {
|
|||
// We also need to make sure that if Rollback is possible, the
|
||||
// rollback state should be retained in the
|
||||
// oldLaunchContext and oldResourceSet
|
||||
container.addDiagnostics("Container will be Restarted.\n");
|
||||
return new ReInitializationContext(
|
||||
container.launchContext, container.resourceSet,
|
||||
container.canRollback() ?
|
||||
|
@ -998,6 +1003,7 @@ public class ContainerImpl implements Container {
|
|||
container.canRollback() ?
|
||||
container.reInitContext.oldResourceSet : null);
|
||||
} else {
|
||||
container.addDiagnostics("Container will be Re-initialized.\n");
|
||||
return new ReInitializationContext(
|
||||
reInitEvent.getReInitLaunchContext(),
|
||||
reInitEvent.getResourceSet(),
|
||||
|
@ -1018,6 +1024,7 @@ public class ContainerImpl implements Container {
|
|||
@Override
|
||||
protected ReInitializationContext createReInitContext(ContainerImpl
|
||||
container, ContainerEvent event) {
|
||||
container.addDiagnostics("Container upgrade will be Rolled-back.\n");
|
||||
LOG.warn("Container [" + container.getContainerId() + "]" +
|
||||
" about to be explicitly Rolledback !!");
|
||||
return container.reInitContext.createContextForRollback();
|
||||
|
|
Loading…
Reference in New Issue