From 82751ff0cb9fd9e988356a03f5be92ce6ccb5df2 Mon Sep 17 00:00:00 2001
From: Jian He
Date: Tue, 16 May 2017 10:48:46 -0700
Subject: [PATCH] YARN-6306. NMClient API change for container upgrade.
Contributed by Arun Suresh
---
.../hadoop/yarn/client/api/NMClient.java | 86 ++++-
.../yarn/client/api/async/NMClientAsync.java | 98 ++++-
.../api/async/impl/NMClientAsyncImpl.java | 260 ++++++++++++-
.../yarn/client/api/impl/NMClientImpl.java | 91 +++++
.../api/async/impl/TestNMClientAsync.java | 359 +++++++++++++++---
.../yarn/client/api/impl/TestNMClient.java | 141 +++++++
.../container/ContainerImpl.java | 9 +-
7 files changed, 966 insertions(+), 78 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
index 47270f5a3d1..c1447ba4a41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
@@ -58,6 +58,10 @@ public abstract class NMClient extends AbstractService {
return client;
}
+ protected enum UpgradeOp {
+ REINIT, RESTART, COMMIT, ROLLBACK
+ }
+
private NMTokenCache nmTokenCache = NMTokenCache.getSingleton();
@Private
@@ -79,8 +83,8 @@ public abstract class NMClient extends AbstractService {
* NodeManager
to launch the
* container
* @return a map between the auxiliary service names and their outputs
- * @throws YarnException
- * @throws IOException
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
*/
public abstract Map startContainer(Container container,
ContainerLaunchContext containerLaunchContext)
@@ -95,9 +99,10 @@ public abstract class NMClient extends AbstractService {
* {@link Container}.
*
*
- * @param container the container with updated token
- * @throws YarnException
- * @throws IOException
+ * @param container the container with updated token.
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
*/
public abstract void increaseContainerResource(Container container)
throws YarnException, IOException;
@@ -107,9 +112,9 @@ public abstract class NMClient extends AbstractService {
*
* @param containerId the Id of the started container
* @param nodeId the Id of the NodeManager
- *
- * @throws YarnException
- * @throws IOException
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
*/
public abstract void stopContainer(ContainerId containerId, NodeId nodeId)
throws YarnException, IOException;
@@ -120,13 +125,61 @@ public abstract class NMClient extends AbstractService {
* @param containerId the Id of the started container
* @param nodeId the Id of the NodeManager
*
- * @return the status of a container
- * @throws YarnException
- * @throws IOException
+ * @return the status of a container.
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
*/
public abstract ContainerStatus getContainerStatus(ContainerId containerId,
NodeId nodeId) throws YarnException, IOException;
+ /**
+ * Re-Initialize the Container.
+ *
+ * @param containerId the Id of the container to Re-Initialize.
+ * @param containerLaunchContex the updated ContainerLaunchContext.
+ * @param autoCommit commit re-initialization automatically ?
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
+ */
+ public abstract void reInitializeContainer(ContainerId containerId,
+ ContainerLaunchContext containerLaunchContex, boolean autoCommit)
+ throws YarnException, IOException;
+
+ /**
+ * Restart the specified container.
+ *
+ * @param containerId the Id of the container to restart.
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
+ */
+ public abstract void restartContainer(ContainerId containerId)
+ throws YarnException, IOException;
+
+ /**
+ * Rollback last reInitialization of the specified container.
+ *
+ * @param containerId the Id of the container to restart.
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
+ */
+ public abstract void rollbackLastReInitialization(ContainerId containerId)
+ throws YarnException, IOException;
+
+ /**
+ * Commit last reInitialization of the specified container.
+ *
+ * @param containerId the Id of the container to commit reInitialize.
+ *
+ * @throws YarnException YarnException.
+ * @throws IOException IOException.
+ */
+ public abstract void commitLastReInitialization(ContainerId containerId)
+ throws YarnException, IOException;
+
/**
* Set whether the containers that are started by this client, and are
* still running should be stopped when the client stops. By default, the
@@ -165,4 +218,15 @@ public abstract class NMClient extends AbstractService {
return nmTokenCache;
}
+ /**
+ * Get the NodeId of the node on which container is running. It returns
+ * null if the container if container is not found or if it is not running.
+ *
+ * @param containerId Container Id of the container.
+ * @return NodeId of the container on which it is running.
+ */
+ public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
+ return null;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
index 8e90564bdf8..c94942ae903 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.client.api.async;
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
@@ -181,6 +179,38 @@ public abstract class NMClientAsync extends AbstractService {
public abstract void increaseContainerResourceAsync(Container container);
+ /**
+ *
Re-Initialize the Container.
+ *
+ * @param containerId the Id of the container to Re-Initialize.
+ * @param containerLaunchContex the updated ContainerLaunchContext.
+ * @param autoCommit commit re-initialization automatically ?
+ */
+ public abstract void reInitializeContainerAsync(ContainerId containerId,
+ ContainerLaunchContext containerLaunchContex, boolean autoCommit);
+
+ /**
+ * Restart the specified container.
+ *
+ * @param containerId the Id of the container to restart.
+ */
+ public abstract void restartContainerAsync(ContainerId containerId);
+
+ /**
+ * Rollback last reInitialization of the specified container.
+ *
+ * @param containerId the Id of the container to restart.
+ */
+ public abstract void rollbackLastReInitializationAsync(
+ ContainerId containerId);
+
+ /**
+ * Commit last reInitialization of the specified container.
+ *
+ * @param containerId the Id of the container to commit reInitialize.
+ */
+ public abstract void commitLastReInitializationAsync(ContainerId containerId);
+
public abstract void stopContainerAsync(
ContainerId containerId, NodeId nodeId);
@@ -303,6 +333,70 @@ public abstract class NMClientAsync extends AbstractService {
*/
public abstract void onStopContainerError(
ContainerId containerId, Throwable t);
+
+ /**
+ * Callback for container re-initialization request.
+ *
+ * @param containerId the Id of the container to be Re-Initialized.
+ */
+ public void onContainerReInitialize(ContainerId containerId) {}
+
+ /**
+ * Callback for container restart.
+ *
+ * @param containerId the Id of the container to restart.
+ */
+ public void onContainerRestart(ContainerId containerId) {}
+
+ /**
+ * Callback for rollback of last re-initialization.
+ *
+ * @param containerId the Id of the container to restart.
+ */
+ public void onRollbackLastReInitialization(ContainerId containerId) {}
+
+ /**
+ * Callback for commit of last re-initialization.
+ *
+ * @param containerId the Id of the container to commit reInitialize.
+ */
+ public void onCommitLastReInitialization(ContainerId containerId) {}
+
+ /**
+ * Error Callback for container re-initialization request.
+ *
+ * @param containerId the Id of the container to be Re-Initialized.
+ * @param t a Throwable.
+ */
+ public void onContainerReInitializeError(ContainerId containerId,
+ Throwable t) {}
+
+ /**
+ * Error Callback for container restart.
+ *
+ * @param containerId the Id of the container to restart.
+ * @param t a Throwable.
+ *
+ */
+ public void onContainerRestartError(ContainerId containerId, Throwable t) {}
+
+ /**
+ * Error Callback for rollback of last re-initialization.
+ *
+ * @param containerId the Id of the container to restart.
+ * @param t a Throwable.
+ */
+ public void onRollbackLastReInitializationError(ContainerId containerId,
+ Throwable t) {}
+
+ /**
+ * Error Callback for commit of last re-initialization.
+ *
+ * @param containerId the Id of the container to commit reInitialize.
+ * @param t a Throwable.
+ */
+ public void onCommitLastReInitializationError(ContainerId containerId,
+ Throwable t) {}
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
index 575ce13e5d8..515a8e86daa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
@@ -282,6 +282,103 @@ public class NMClientAsyncImpl extends NMClientAsync {
}
}
+ @Override
+ public void reInitializeContainerAsync(ContainerId containerId,
+ ContainerLaunchContext containerLaunchContex, boolean autoCommit){
+ if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+ LOG.error("Callback handler does not implement container re-initialize "
+ + "callback methods");
+ return;
+ }
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+ if (containers.get(containerId) == null) {
+ handler.onContainerReInitializeError(
+ containerId, RPCUtil.getRemoteException(
+ "Container " + containerId + " is not started"));
+ }
+ try {
+ events.put(new ReInitializeContainerEvevnt(containerId,
+ client.getNodeIdOfStartedContainer(containerId),
+ containerLaunchContex, autoCommit));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of re-initializing of "
+ + "Container " + containerId);
+ handler.onContainerReInitializeError(containerId, e);
+ }
+ }
+
+ @Override
+ public void restartContainerAsync(ContainerId containerId){
+ if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+ LOG.error("Callback handler does not implement container restart "
+ + "callback methods");
+ return;
+ }
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+ if (containers.get(containerId) == null) {
+ handler.onContainerRestartError(
+ containerId, RPCUtil.getRemoteException(
+ "Container " + containerId + " is not started"));
+ }
+ try {
+ events.put(new ContainerEvent(containerId,
+ client.getNodeIdOfStartedContainer(containerId),
+ null, ContainerEventType.RESTART_CONTAINER));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of restart of "
+ + "Container " + containerId);
+ handler.onContainerRestartError(containerId, e);
+ }
+ }
+
+ @Override
+ public void rollbackLastReInitializationAsync(ContainerId containerId){
+ if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+ LOG.error("Callback handler does not implement container rollback "
+ + "callback methods");
+ return;
+ }
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+ if (containers.get(containerId) == null) {
+ handler.onRollbackLastReInitializationError(
+ containerId, RPCUtil.getRemoteException(
+ "Container " + containerId + " is not started"));
+ }
+ try {
+ events.put(new ContainerEvent(containerId,
+ client.getNodeIdOfStartedContainer(containerId),
+ null, ContainerEventType.ROLLBACK_LAST_REINIT));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event Rollback re-initialization"
+ + " of Container " + containerId);
+ handler.onRollbackLastReInitializationError(containerId, e);
+ }
+ }
+
+ @Override
+ public void commitLastReInitializationAsync(ContainerId containerId){
+ if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+ LOG.error("Callback handler does not implement container commit last " +
+ "re-initialization callback methods");
+ return;
+ }
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+ if (containers.get(containerId) == null) {
+ handler.onCommitLastReInitializationError(
+ containerId, RPCUtil.getRemoteException(
+ "Container " + containerId + " is not started"));
+ }
+ try {
+ events.put(new ContainerEvent(containerId,
+ client.getNodeIdOfStartedContainer(containerId),
+ null, ContainerEventType.COMMIT_LAST_REINT));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event Commit re-initialization"
+ + " of Container " + containerId);
+ handler.onCommitLastReInitializationError(containerId, e);
+ }
+ }
+
public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId,
@@ -330,7 +427,11 @@ public class NMClientAsyncImpl extends NMClientAsync {
START_CONTAINER,
STOP_CONTAINER,
QUERY_CONTAINER,
- INCREASE_CONTAINER_RESOURCE
+ INCREASE_CONTAINER_RESOURCE,
+ REINITIALIZE_CONTAINER,
+ RESTART_CONTAINER,
+ ROLLBACK_LAST_REINIT,
+ COMMIT_LAST_REINT
}
protected static class ContainerEvent
@@ -381,6 +482,27 @@ public class NMClientAsyncImpl extends NMClientAsync {
}
}
+ protected static class ReInitializeContainerEvevnt extends ContainerEvent {
+ private ContainerLaunchContext containerLaunchContext;
+ private boolean autoCommit;
+
+ public ReInitializeContainerEvevnt(ContainerId containerId, NodeId nodeId,
+ ContainerLaunchContext containerLaunchContext, boolean autoCommit) {
+ super(containerId, nodeId, null,
+ ContainerEventType.REINITIALIZE_CONTAINER);
+ this.containerLaunchContext = containerLaunchContext;
+ this.autoCommit = autoCommit;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return containerLaunchContext;
+ }
+
+ public boolean isAutoCommit() {
+ return autoCommit;
+ }
+ }
+
protected static class IncreaseContainerResourceEvent extends ContainerEvent {
private Container container;
@@ -416,6 +538,25 @@ public class NMClientAsyncImpl extends NMClientAsync {
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.INCREASE_CONTAINER_RESOURCE,
new IncreaseContainerResourceTransition())
+
+ // Transitions for Container Upgrade
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ new ReInitializeContainerTransition())
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.RESTART_CONTAINER,
+ new ReInitializeContainerTransition())
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.ROLLBACK_LAST_REINIT,
+ new ReInitializeContainerTransition())
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.COMMIT_LAST_REINT,
+ new ReInitializeContainerTransition())
+
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
ContainerEventType.STOP_CONTAINER,
@@ -431,6 +572,10 @@ public class NMClientAsyncImpl extends NMClientAsync {
.addTransition(ContainerState.FAILED, ContainerState.FAILED,
EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER,
+ ContainerEventType.REINITIALIZE_CONTAINER,
+ ContainerEventType.RESTART_CONTAINER,
+ ContainerEventType.COMMIT_LAST_REINT,
+ ContainerEventType.ROLLBACK_LAST_REINIT,
ContainerEventType.INCREASE_CONTAINER_RESOURCE));
protected static class StartContainerTransition implements
@@ -529,6 +674,119 @@ public class NMClientAsyncImpl extends NMClientAsync {
}
}
+ protected static class ReInitializeContainerTransition implements
+ MultipleArcTransition {
+
+ @Override
+ public ContainerState transition(StatefulContainer container,
+ ContainerEvent containerEvent) {
+ ContainerId containerId = containerEvent.getContainerId();
+ AbstractCallbackHandler handler = (AbstractCallbackHandler) container
+ .nmClientAsync.getCallbackHandler();
+ Throwable handlerError = null;
+ try {
+ switch(containerEvent.getType()) {
+ case REINITIALIZE_CONTAINER:
+ if (!(containerEvent instanceof ReInitializeContainerEvevnt)) {
+ LOG.error("Unexpected Event.. [" +containerEvent.getType() + "]");
+ return ContainerState.FAILED;
+ }
+ ReInitializeContainerEvevnt rEvent =
+ (ReInitializeContainerEvevnt)containerEvent;
+ container.nmClientAsync.getClient().reInitializeContainer(
+ containerId, rEvent.getContainerLaunchContext(),
+ rEvent.isAutoCommit());
+ try {
+ handler.onContainerReInitialize(containerId);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ case RESTART_CONTAINER:
+ container.nmClientAsync.getClient().restartContainer(containerId);
+ try {
+ handler.onContainerRestart(containerId);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ case ROLLBACK_LAST_REINIT:
+ container.nmClientAsync.getClient()
+ .rollbackLastReInitialization(containerId);
+ try {
+ handler.onRollbackLastReInitialization(containerId);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ case COMMIT_LAST_REINT:
+ container.nmClientAsync.getClient()
+ .commitLastReInitialization(containerId);
+ try {
+ handler.onCommitLastReInitialization(containerId);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ default:
+ LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
+ " expected here..");
+ break;
+ }
+ if (handlerError != null) {
+ LOG.info("Unchecked exception is thrown in handler for event ["
+ + containerEvent.getType() + "] for Container "
+ + containerId, handlerError);
+ }
+
+ return ContainerState.RUNNING;
+ } catch (Throwable t) {
+ switch(containerEvent.getType()) {
+ case REINITIALIZE_CONTAINER:
+ try {
+ handler.onContainerReInitializeError(containerId, t);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ case RESTART_CONTAINER:
+ try {
+ handler.onContainerRestartError(containerId, t);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ case ROLLBACK_LAST_REINIT:
+ try {
+ handler.onRollbackLastReInitializationError(containerId, t);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ case COMMIT_LAST_REINT:
+ try {
+ handler.onCommitLastReInitializationError(containerId, t);
+ } catch (Throwable tr) {
+ handlerError = tr;
+ }
+ break;
+ default:
+ LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
+ " expected here..");
+ break;
+ }
+ if (handlerError != null) {
+ LOG.info("Unchecked exception is thrown in handler for event ["
+ + containerEvent.getType() + "] for Container "
+ + containerId, handlerError);
+ }
+ }
+
+ return ContainerState.FAILED;
+ }
+ }
+
protected static class StopContainerTransition implements
MultipleArcTransition {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index dc92cda3d5a..c81d448f8b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -33,10 +33,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -306,6 +309,84 @@ public class NMClientImpl extends NMClient {
}
}
+ @Override
+ public void reInitializeContainer(ContainerId containerId,
+ ContainerLaunchContext containerLaunchContex, boolean autoCommit)
+ throws YarnException, IOException {
+ ContainerManagementProtocolProxyData proxy = null;
+ StartedContainer container = startedContainers.get(containerId);
+ if (container != null) {
+ synchronized (container) {
+ proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
+ try {
+ proxy.getContainerManagementProtocol().reInitializeContainer(
+ ReInitializeContainerRequest.newInstance(
+ containerId, containerLaunchContex, autoCommit));
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ }
+ } else {
+ throw new YarnException("Unknown container [" + containerId + "]");
+ }
+ }
+
+ @Override
+ public void restartContainer(ContainerId containerId)
+ throws YarnException, IOException {
+ restartCommitOrRollbackContainer(containerId, UpgradeOp.RESTART);
+ }
+
+ @Override
+ public void rollbackLastReInitialization(ContainerId containerId)
+ throws YarnException, IOException {
+ restartCommitOrRollbackContainer(containerId, UpgradeOp.ROLLBACK);
+ }
+
+ @Override
+ public void commitLastReInitialization(ContainerId containerId)
+ throws YarnException, IOException {
+ restartCommitOrRollbackContainer(containerId, UpgradeOp.COMMIT);
+ }
+
+
+ private void restartCommitOrRollbackContainer(ContainerId containerId,
+ UpgradeOp upgradeOp) throws YarnException, IOException {
+ ContainerManagementProtocolProxyData proxy = null;
+ StartedContainer container = startedContainers.get(containerId);
+ if (container != null) {
+ synchronized (container) {
+ proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
+ ContainerManagementProtocol cmp =
+ proxy.getContainerManagementProtocol();
+ try {
+ switch (upgradeOp) {
+ case RESTART:
+ cmp.restartContainer(containerId);
+ break;
+ case COMMIT:
+ cmp.commitLastReInitialization(containerId);
+ break;
+ case ROLLBACK:
+ cmp.rollbackLastReInitialization(containerId);
+ break;
+ default:
+ // Should not happen..
+ break;
+ }
+ } finally {
+ if (proxy != null) {
+ cmProxy.mayBeCloseProxy(proxy);
+ }
+ }
+ }
+ } else {
+ throw new YarnException("Unknown container [" + containerId + "]");
+ }
+ }
+
private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
throws IOException, YarnException {
ContainerManagementProtocolProxyData proxy = null;
@@ -343,4 +424,14 @@ public class NMClientImpl extends NMClient {
throw (IOException) t;
}
}
+
+ @Override
+ public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
+ StartedContainer container = startedContainers.get(containerId);
+ if (container != null) {
+ return container.getNodeId();
+ }
+ return null;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
index 48f34312c1a..dda3eec0d0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.async.impl;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -27,6 +28,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +73,22 @@ public class TestNMClientAsync {
private NodeId nodeId;
private Token containerToken;
+ enum OpsToTest {
+ START, QUERY, STOP, INCR, REINIT, RESTART, ROLLBACK, COMMIT
+ }
+
+ final static class TestData {
+ AtomicInteger success = new AtomicInteger(0);
+ AtomicInteger failure = new AtomicInteger(0);
+ final AtomicIntegerArray successArray;
+ final AtomicIntegerArray failureArray;
+
+ private TestData(int expectedSuccess, int expectedFailure) {
+ this.successArray = new AtomicIntegerArray(expectedSuccess);
+ this.failureArray = new AtomicIntegerArray(expectedFailure);
+ }
+ }
+
@After
public void teardown() {
ServiceOperations.stop(asyncClient);
@@ -194,25 +214,7 @@ public class TestNMClientAsync {
private int expectedSuccess;
private int expectedFailure;
- private AtomicInteger actualStartSuccess = new AtomicInteger(0);
- private AtomicInteger actualStartFailure = new AtomicInteger(0);
- private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
- private AtomicInteger actualQueryFailure = new AtomicInteger(0);
- private AtomicInteger actualStopSuccess = new AtomicInteger(0);
- private AtomicInteger actualStopFailure = new AtomicInteger(0);
- private AtomicInteger actualIncreaseResourceSuccess =
- new AtomicInteger(0);
- private AtomicInteger actualIncreaseResourceFailure =
- new AtomicInteger(0);
-
- private AtomicIntegerArray actualStartSuccessArray;
- private AtomicIntegerArray actualStartFailureArray;
- private AtomicIntegerArray actualQuerySuccessArray;
- private AtomicIntegerArray actualQueryFailureArray;
- private AtomicIntegerArray actualStopSuccessArray;
- private AtomicIntegerArray actualStopFailureArray;
- private AtomicIntegerArray actualIncreaseResourceSuccessArray;
- private AtomicIntegerArray actualIncreaseResourceFailureArray;
+ private final Map testMap = new HashMap<>();
private Set errorMsgs =
Collections.synchronizedSet(new HashSet());
@@ -221,16 +223,9 @@ public class TestNMClientAsync {
this.expectedSuccess = expectedSuccess;
this.expectedFailure = expectedFailure;
- actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
- actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
- actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
- actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
- actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
- actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
- actualIncreaseResourceSuccessArray =
- new AtomicIntegerArray(expectedSuccess);
- actualIncreaseResourceFailureArray =
- new AtomicIntegerArray(expectedFailure);
+ for (OpsToTest op : OpsToTest.values()) {
+ testMap.put(op, new TestData(expectedSuccess, expectedFailure));
+ }
}
@SuppressWarnings("deprecation")
@@ -243,8 +238,9 @@ public class TestNMClientAsync {
" should throw the exception onContainerStarted");
return;
}
- actualStartSuccess.addAndGet(1);
- actualStartSuccessArray.set(containerId.getId(), 1);
+ TestData td = testMap.get(OpsToTest.START);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.getContainerStatusAsync(containerId, nodeId);
@@ -254,7 +250,28 @@ public class TestNMClientAsync {
// containerId
Container container = Container.newInstance(
containerId, nodeId, null, null, null, containerToken);
- asyncClient.increaseContainerResourceAsync(container);
+ int t = containerId.getId() % 5;
+ switch (t) {
+ case 0:
+ asyncClient.increaseContainerResourceAsync(container);
+ break;
+ case 1:
+ asyncClient.reInitializeContainerAsync(containerId,
+ recordFactory.newRecordInstance(ContainerLaunchContext.class),
+ true);
+ break;
+ case 2:
+ asyncClient.restartContainerAsync(containerId);
+ break;
+ case 3:
+ asyncClient.rollbackLastReInitializationAsync(containerId);
+ break;
+ case 4:
+ asyncClient.commitLastReInitializationAsync(containerId);
+ break;
+ default:
+ break;
+ }
}
// Shouldn't crash the test thread
@@ -270,8 +287,9 @@ public class TestNMClientAsync {
" should throw the exception onContainerStatusReceived");
return;
}
- actualQuerySuccess.addAndGet(1);
- actualQuerySuccessArray.set(containerId.getId(), 1);
+ TestData td = testMap.get(OpsToTest.QUERY);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
// make sure we pass in the container with the same
// containerId
@@ -292,8 +310,78 @@ public class TestNMClientAsync {
" should throw the exception onContainerResourceIncreased");
return;
}
- actualIncreaseResourceSuccess.addAndGet(1);
- actualIncreaseResourceSuccessArray.set(containerId.getId(), 1);
+ TestData td = testMap.get(OpsToTest.INCR);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
+ // move on to the following success tests
+ asyncClient.reInitializeContainerAsync(containerId,
+ Records.newRecord(ContainerLaunchContext.class), true);
+ // throw a fake user exception, and shouldn't crash the test
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onContainerReInitialize(ContainerId containerId) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerReInitialize");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.REINIT);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
+ // move on to the following success tests
+ asyncClient.restartContainerAsync(containerId);
+ // throw a fake user exception, and shouldn't crash the test
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onContainerRestart(ContainerId containerId) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerReInitialize");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.RESTART);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
+ // move on to the following success tests
+ asyncClient.rollbackLastReInitializationAsync(containerId);
+ // throw a fake user exception, and shouldn't crash the test
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onRollbackLastReInitialization(ContainerId containerId) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerReInitialize");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.ROLLBACK);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
+ // move on to the following success tests
+ asyncClient.commitLastReInitializationAsync(containerId);
+ // throw a fake user exception, and shouldn't crash the test
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onCommitLastReInitialization(ContainerId containerId) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerReInitialize");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.COMMIT);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.stopContainerAsync(containerId, nodeId);
// throw a fake user exception, and shouldn't crash the test
@@ -308,8 +396,9 @@ public class TestNMClientAsync {
" should throw the exception onContainerStopped");
return;
}
- actualStopSuccess.addAndGet(1);
- actualStopSuccessArray.set(containerId.getId(), 1);
+ TestData td = testMap.get(OpsToTest.STOP);
+ td.success.addAndGet(1);
+ td.successArray.set(containerId.getId(), 1);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -330,8 +419,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onStartContainerError");
return;
}
- actualStartFailure.addAndGet(1);
- actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
+ TestData td = testMap.get(OpsToTest.START);
+ td.failure.addAndGet(1);
+ td.failureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests
asyncClient.getContainerStatusAsync(containerId, nodeId);
@@ -348,8 +438,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onIncreaseContainerResourceError");
return;
}
- actualIncreaseResourceFailure.addAndGet(1);
- actualIncreaseResourceFailureArray.set(
+ TestData td = testMap.get(OpsToTest.INCR);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// increase container resource error should NOT change the
// the container status to FAILED
@@ -359,6 +450,102 @@ public class TestNMClientAsync {
throw new RuntimeException("Ignorable Exception");
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onContainerReInitializeError(ContainerId containerId,
+ Throwable t) {
+ if (containerId.getId() < expectedSuccess + expectedFailure) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onContainerReInitializeError");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.REINIT);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+ // increment the stop counters here.. since the container will fail
+ td = testMap.get(OpsToTest.STOP);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+ // reInit container changes the container status to FAILED
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onContainerRestartError(ContainerId containerId, Throwable t) {
+ if (containerId.getId() < expectedSuccess + expectedFailure) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onContainerRestartError");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.RESTART);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+ // increment the stop counters here.. since the container will fail
+ td = testMap.get(OpsToTest.STOP);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+ // restart container changes the container status to FAILED
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onRollbackLastReInitializationError(ContainerId containerId,
+ Throwable t) {
+ if (containerId.getId() < expectedSuccess + expectedFailure) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception" +
+ " onRollbackLastReInitializationError");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.ROLLBACK);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+ // increment the stop counters here.. since the container will fail
+ td = testMap.get(OpsToTest.STOP);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+ // rollback container changes the container status to FAILED
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void onCommitLastReInitializationError(ContainerId containerId,
+ Throwable t) {
+ if (containerId.getId() < expectedSuccess + expectedFailure) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onCommitLastReInitializationError");
+ return;
+ }
+ TestData td = testMap.get(OpsToTest.COMMIT);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+ // increment the stop counters here.. since the container will fail
+ td = testMap.get(OpsToTest.STOP);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+ // commit container changes the container status to FAILED
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
@SuppressWarnings("deprecation")
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
@@ -371,9 +558,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onStopContainerError");
return;
}
-
- actualStopFailure.addAndGet(1);
- actualStopFailureArray.set(
+ TestData td = testMap.get(OpsToTest.STOP);
+ td.failure.addAndGet(1);
+ td.failureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// Shouldn't crash the test thread
@@ -393,8 +580,9 @@ public class TestNMClientAsync {
" shouldn't throw the exception onGetContainerStatusError");
return;
}
- actualQueryFailure.addAndGet(1);
- actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
+ TestData td = testMap.get(OpsToTest.QUERY);
+ td.failure.addAndGet(1);
+ td.failureArray.set(containerId.getId() - expectedSuccess, 1);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@@ -402,44 +590,67 @@ public class TestNMClientAsync {
public boolean isAllSuccessCallsExecuted() {
boolean isAllSuccessCallsExecuted =
- actualStartSuccess.get() == expectedSuccess &&
- actualQuerySuccess.get() == expectedSuccess &&
- actualIncreaseResourceSuccess.get() == expectedSuccess &&
- actualStopSuccess.get() == expectedSuccess;
+ testMap.get(OpsToTest.START).success.get() == expectedSuccess &&
+ testMap.get(OpsToTest.QUERY).success.get() == expectedSuccess &&
+ testMap.get(OpsToTest.INCR).success.get() == expectedSuccess &&
+ testMap.get(OpsToTest.REINIT).success.get() == expectedSuccess &&
+ testMap.get(OpsToTest.RESTART).success.get() == expectedSuccess &&
+ testMap.get(OpsToTest.ROLLBACK).success.get() ==
+ expectedSuccess &&
+ testMap.get(OpsToTest.COMMIT).success.get() == expectedSuccess &&
+ testMap.get(OpsToTest.STOP).success.get() == expectedSuccess;
if (isAllSuccessCallsExecuted) {
- assertAtomicIntegerArray(actualStartSuccessArray);
- assertAtomicIntegerArray(actualQuerySuccessArray);
- assertAtomicIntegerArray(actualIncreaseResourceSuccessArray);
- assertAtomicIntegerArray(actualStopSuccessArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.START).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.INCR).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.REINIT).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.RESTART).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.ROLLBACK).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.COMMIT).successArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).successArray);
}
return isAllSuccessCallsExecuted;
}
public boolean isStartAndQueryFailureCallsExecuted() {
boolean isStartAndQueryFailureCallsExecuted =
- actualStartFailure.get() == expectedFailure &&
- actualQueryFailure.get() == expectedFailure;
+ testMap.get(OpsToTest.START).failure.get() == expectedFailure &&
+ testMap.get(OpsToTest.QUERY).failure.get() == expectedFailure;
if (isStartAndQueryFailureCallsExecuted) {
- assertAtomicIntegerArray(actualStartFailureArray);
- assertAtomicIntegerArray(actualQueryFailureArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.START).failureArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).failureArray);
}
return isStartAndQueryFailureCallsExecuted;
}
public boolean isIncreaseResourceFailureCallsExecuted() {
boolean isIncreaseResourceFailureCallsExecuted =
- actualIncreaseResourceFailure.get() == expectedFailure;
+ testMap.get(OpsToTest.INCR).failure.get()
+ + testMap.get(OpsToTest.REINIT).failure.get()
+ + testMap.get(OpsToTest.RESTART).failure.get()
+ + testMap.get(OpsToTest.ROLLBACK).failure.get()
+ + testMap.get(OpsToTest.COMMIT).failure.get()
+ == expectedFailure;
if (isIncreaseResourceFailureCallsExecuted) {
- assertAtomicIntegerArray(actualIncreaseResourceFailureArray);
+ AtomicIntegerArray testArray =
+ new AtomicIntegerArray(
+ testMap.get(OpsToTest.INCR).failureArray.length());
+ for (int i = 0; i < testArray.length(); i++) {
+ for (OpsToTest op : EnumSet.of(OpsToTest.REINIT, OpsToTest.RESTART,
+ OpsToTest.ROLLBACK, OpsToTest.COMMIT, OpsToTest.INCR)) {
+ testArray.addAndGet(i, testMap.get(op).failureArray.get(i));
+ }
+ }
+ assertAtomicIntegerArray(testArray);
}
return isIncreaseResourceFailureCallsExecuted;
}
public boolean isStopFailureCallsExecuted() {
boolean isStopFailureCallsExecuted =
- actualStopFailure.get() == expectedFailure;
+ testMap.get(OpsToTest.STOP).failure.get() == expectedFailure;
if (isStopFailureCallsExecuted) {
- assertAtomicIntegerArray(actualStopFailureArray);
+ assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).failureArray);
}
return isStopFailureCallsExecuted;
}
@@ -464,6 +675,14 @@ public class TestNMClientAsync {
recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).increaseContainerResource(
any(Container.class));
+ doNothing().when(client).reInitializeContainer(
+ any(ContainerId.class), any(ContainerLaunchContext.class),
+ anyBoolean());
+ doNothing().when(client).restartContainer(any(ContainerId.class));
+ doNothing().when(client).rollbackLastReInitialization(
+ any(ContainerId.class));
+ doNothing().when(client).commitLastReInitialization(
+ any(ContainerId.class));
doNothing().when(client).stopContainer(any(ContainerId.class),
any(NodeId.class));
break;
@@ -485,9 +704,23 @@ public class TestNMClientAsync {
recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
.when(client).increaseContainerResource(any(Container.class));
+ doThrow(RPCUtil.getRemoteException("ReInitialize Exception"))
+ .when(client).reInitializeContainer(
+ any(ContainerId.class), any(ContainerLaunchContext.class),
+ anyBoolean());
+ doThrow(RPCUtil.getRemoteException("Restart Exception"))
+ .when(client).restartContainer(any(ContainerId.class));
+ doThrow(RPCUtil.getRemoteException("Rollback upgrade Exception"))
+ .when(client).rollbackLastReInitialization(
+ any(ContainerId.class));
+ doThrow(RPCUtil.getRemoteException("Commit upgrade Exception"))
+ .when(client).commitLastReInitialization(
+ any(ContainerId.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
.stopContainer(any(ContainerId.class), any(NodeId.class));
}
+ when(client.getNodeIdOfStartedContainer(any(ContainerId.class)))
+ .thenReturn(NodeId.newInstance("localhost", 0));
return client;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index d211d6d0159..1034f7eacc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -310,6 +312,36 @@ public class TestNMClient {
e.getMessage().contains("is not handled by this NodeManager"));
}
+ // restart shouldn't be called before startContainer,
+ // otherwise, NodeManager cannot find the container
+ try {
+ nmClient.restartContainer(container.getId());
+ fail("Exception is expected");
+ } catch (YarnException e) {
+ assertTrue("The thrown exception is not expected",
+ e.getMessage().contains("Unknown container"));
+ }
+
+ // rollback shouldn't be called before startContainer,
+ // otherwise, NodeManager cannot find the container
+ try {
+ nmClient.rollbackLastReInitialization(container.getId());
+ fail("Exception is expected");
+ } catch (YarnException e) {
+ assertTrue("The thrown exception is not expected",
+ e.getMessage().contains("Unknown container"));
+ }
+
+ // commit shouldn't be called before startContainer,
+ // otherwise, NodeManager cannot find the container
+ try {
+ nmClient.commitLastReInitialization(container.getId());
+ fail("Exception is expected");
+ } catch (YarnException e) {
+ assertTrue("The thrown exception is not expected",
+ e.getMessage().contains("Unknown container"));
+ }
+
// stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown
try {
@@ -353,6 +385,28 @@ public class TestNMClient {
// Test increase container API and make sure requests can reach NM
testIncreaseContainerResource(container);
+ testRestartContainer(container.getId());
+ testGetContainerStatus(container, i, ContainerState.RUNNING,
+ "will be Restarted", Arrays.asList(new Integer[] {-1000}));
+
+ if (i % 2 == 0) {
+ testReInitializeContainer(container.getId(), clc, false);
+ testGetContainerStatus(container, i, ContainerState.RUNNING,
+ "will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
+ testRollbackContainer(container.getId(), false);
+ testGetContainerStatus(container, i, ContainerState.RUNNING,
+ "will be Rolled-back", Arrays.asList(new Integer[] {-1000}));
+ testCommitContainer(container.getId(), true);
+ testReInitializeContainer(container.getId(), clc, false);
+ testCommitContainer(container.getId(), false);
+ } else {
+ testReInitializeContainer(container.getId(), clc, true);
+ testGetContainerStatus(container, i, ContainerState.RUNNING,
+ "will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
+ testRollbackContainer(container.getId(), true);
+ testCommitContainer(container.getId(), true);
+ }
+
try {
nmClient.stopContainer(container.getId(), container.getNodeId());
} catch (YarnException e) {
@@ -432,4 +486,91 @@ public class TestNMClient {
}
}
}
+
+ private void testRestartContainer(ContainerId containerId)
+ throws YarnException, IOException {
+ try {
+ sleep(250);
+ nmClient.restartContainer(containerId);
+ sleep(250);
+ } catch (YarnException e) {
+ // NM container will only be in SCHEDULED state, so expect the increase
+ // action to fail.
+ if (!e.getMessage().contains(
+ "can only be changed when a container is in RUNNING state")) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e)
+ .initCause(e));
+ }
+ }
+ }
+
+ private void testRollbackContainer(ContainerId containerId,
+ boolean notRollbackable) throws YarnException, IOException {
+ try {
+ sleep(250);
+ nmClient.rollbackLastReInitialization(containerId);
+ if (notRollbackable) {
+ fail("Should not be able to rollback..");
+ }
+ sleep(250);
+ } catch (YarnException e) {
+ // NM container will only be in SCHEDULED state, so expect the increase
+ // action to fail.
+ if (notRollbackable) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Nothing to rollback to"));
+ } else {
+ if (!e.getMessage().contains(
+ "can only be changed when a container is in RUNNING state")) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e)
+ .initCause(e));
+ }
+ }
+ }
+ }
+
+ private void testCommitContainer(ContainerId containerId,
+ boolean notCommittable) throws YarnException, IOException {
+ try {
+ nmClient.commitLastReInitialization(containerId);
+ if (notCommittable) {
+ fail("Should not be able to commit..");
+ }
+ } catch (YarnException e) {
+ // NM container will only be in SCHEDULED state, so expect the increase
+ // action to fail.
+ if (notCommittable) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Nothing to Commit"));
+ } else {
+ if (!e.getMessage().contains(
+ "can only be changed when a container is in RUNNING state")) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e)
+ .initCause(e));
+ }
+ }
+ }
+ }
+
+ private void testReInitializeContainer(ContainerId containerId,
+ ContainerLaunchContext clc, boolean autoCommit)
+ throws YarnException, IOException {
+ try {
+ sleep(250);
+ nmClient.reInitializeContainer(containerId, clc, autoCommit);
+ sleep(250);
+ } catch (YarnException e) {
+ // NM container will only be in SCHEDULED state, so expect the increase
+ // action to fail.
+ if (!e.getMessage().contains(
+ "can only be changed when a container is in RUNNING state")) {
+ throw (AssertionError)
+ (new AssertionError("Exception is not expected: " + e)
+ .initCause(e));
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 4d93aa7ee1f..574e0917d92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -22,8 +22,10 @@ import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -136,6 +138,8 @@ public class ContainerImpl implements Container {
}
}
+ private final SimpleDateFormat dateFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private final Lock readLock;
private final Lock writeLock;
private final Dispatcher dispatcher;
@@ -763,7 +767,7 @@ public class ContainerImpl implements Container {
private void addDiagnostics(String... diags) {
for (String s : diags) {
- this.diagnostics.append(s);
+ this.diagnostics.append("[" + dateFormat.format(new Date()) + "]" + s);
}
if (diagnostics.length() > diagnosticsMaxSize) {
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
@@ -987,6 +991,7 @@ public class ContainerImpl implements Container {
// We also need to make sure that if Rollback is possible, the
// rollback state should be retained in the
// oldLaunchContext and oldResourceSet
+ container.addDiagnostics("Container will be Restarted.\n");
return new ReInitializationContext(
container.launchContext, container.resourceSet,
container.canRollback() ?
@@ -994,6 +999,7 @@ public class ContainerImpl implements Container {
container.canRollback() ?
container.reInitContext.oldResourceSet : null);
} else {
+ container.addDiagnostics("Container will be Re-initialized.\n");
return new ReInitializationContext(
reInitEvent.getReInitLaunchContext(),
reInitEvent.getResourceSet(),
@@ -1014,6 +1020,7 @@ public class ContainerImpl implements Container {
@Override
protected ReInitializationContext createReInitContext(ContainerImpl
container, ContainerEvent event) {
+ container.addDiagnostics("Container upgrade will be Rolled-back.\n");
LOG.warn("Container [" + container.getContainerId() + "]" +
" about to be explicitly Rolledback !!");
return container.reInitContext.createContextForRollback();