diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 34ce5efc0a9..ed8e9d43b89 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -67,6 +67,9 @@ Release 2.0.5-beta - UNRELEASED
Azure environments. (See breakdown of tasks below for subtasks and
contributors)
+ YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
+ via vinodkv)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index edded4a3313..46940ce263b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -708,6 +708,14 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
1000;
+ /**
+ * Max number of threads in NMClientAsync to process container management
+ * events
+ */
+ public static final String NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE =
+ YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
+ public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
+
public YarnConfiguration() {
super();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
new file mode 100644
index 00000000000..b3563a0c245
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface NMClient extends Service {
+
+ /**
+ *
Start an allocated container.
+ *
+ * The ApplicationMaster
or other applications that use the
+ * client must provide the details of the allocated container, including the
+ * Id, the assigned node's Id and the token via {@link Container}. In
+ * addition, the AM needs to provide the {@link ContainerLaunchContext} as
+ * well.
+ *
+ * @param container the allocated container
+ * @param containerLaunchContext the context information needed by the
+ * NodeManager
to launch the
+ * container
+ * @return a map between the auxiliary service names and their outputs
+ * @throws YarnRemoteException
+ * @throws IOException
+ */
+ Map startContainer(Container container,
+ ContainerLaunchContext containerLaunchContext)
+ throws YarnRemoteException, IOException;
+
+ /**
+ * Stop an started container.
+ *
+ * @param containerId the Id of the started container
+ * @param nodeId the Id of the NodeManager
+ * @param containerToken the security token to verify authenticity of the
+ * started container
+ * @throws YarnRemoteException
+ * @throws IOException
+ */
+ void stopContainer(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) throws YarnRemoteException, IOException;
+
+ /**
+ * Query the status of a container.
+ *
+ * @param containerId the Id of the started container
+ * @param nodeId the Id of the NodeManager
+ * @param containerToken the security token to verify authenticity of the
+ * started container
+ * @return the status of a container
+ * @throws YarnRemoteException
+ * @throws IOException
+ */
+ ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) throws YarnRemoteException, IOException;
+
+ /**
+ * Set whether the containers that are started by this client, and are
+ * still running should be stopped when the client stops. By default, the
+ * feature should be enabled.
+ *
+ * @param enabled whether the feature is enabled or not
+ */
+ void cleanupRunningContainersOnStop(boolean enabled);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
new file mode 100644
index 00000000000..44d6f345d8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
@@ -0,0 +1,709 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * NMClientAsync
handles communication with all the NodeManagers
+ * and provides asynchronous updates on getting responses from them. It
+ * maintains a thread pool to communicate with individual NMs where a number of
+ * worker threads process requests to NMs by using {@link NMClientImpl}. The max
+ * size of the thread pool is configurable through
+ * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
+ *
+ * It should be used in conjunction with a CallbackHandler. For example
+ *
+ *
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ * public void onContainerStarted(ContainerId containerId,
+ * Map allServiceResponse) {
+ * [post process after the container is started, process the response]
+ * }
+ *
+ * public void onContainerStatusReceived(ContainerId containerId,
+ * ContainerStatus containerStatus) {
+ * [make use of the status of the container]
+ * }
+ *
+ * public void onContainerStopped(ContainerId containerId) {
+ * [post process after the container is stopped]
+ * }
+ *
+ * public void onStartContainerError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ *
+ * public void onGetContainerStatusError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ *
+ * public void onStopContainerError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ * }
+ * }
+ *
+ *
+ * The client's life-cycle should be managed like the following:
+ *
+ *
+ * {@code
+ * NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ * container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ * container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ *
+ */
+@Unstable
+@Evolving
+public class NMClientAsync extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
+
+ protected static final int INITIAL_THREAD_POOL_SIZE = 10;
+
+ protected ThreadPoolExecutor threadPool;
+ protected int maxThreadPoolSize;
+ protected Thread eventDispatcherThread;
+ protected AtomicBoolean stopped = new AtomicBoolean(false);
+ protected BlockingQueue events =
+ new LinkedBlockingQueue();
+
+ protected NMClient client;
+ protected CallbackHandler callbackHandler;
+
+ protected ConcurrentMap containers =
+ new ConcurrentHashMap();
+
+ public NMClientAsync(CallbackHandler callbackHandler) {
+ this (NMClientAsync.class.getName(), callbackHandler);
+ }
+
+ public NMClientAsync(String name, CallbackHandler callbackHandler) {
+ this (name, new NMClientImpl(), callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected NMClientAsync(String name, NMClient client,
+ CallbackHandler callbackHandler) {
+ super(name);
+ this.client = client;
+ this.callbackHandler = callbackHandler;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.maxThreadPoolSize = conf.getInt(
+ YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
+ YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
+ LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
+
+ client.init(conf);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ client.start();
+
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ this.getClass().getName() + " #%d").setDaemon(true).build();
+
+ // Start with a default core-pool size and change it dynamically.
+ int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
+ threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
+ TimeUnit.HOURS, new LinkedBlockingQueue(), tf);
+
+ eventDispatcherThread = new Thread() {
+ @Override
+ public void run() {
+ ContainerEvent event = null;
+ Set allNodes = new HashSet();
+
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = events.take();
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.error("Returning, thread interrupted", e);
+ }
+ return;
+ }
+
+ allNodes.add(event.getNodeId().toString());
+
+ int threadPoolSize = threadPool.getCorePoolSize();
+
+ // We can increase the pool size only if haven't reached the maximum
+ // limit yet.
+ if (threadPoolSize != maxThreadPoolSize) {
+
+ // nodes where containers will run at *this* point of time. This is
+ // *not* the cluster size and doesn't need to be.
+ int nodeNum = allNodes.size();
+ int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
+
+ if (threadPoolSize < idealThreadPoolSize) {
+ // Bump up the pool size to idealThreadPoolSize +
+ // INITIAL_POOL_SIZE, the later is just a buffer so we are not
+ // always increasing the pool-size
+ int newThreadPoolSize = Math.min(maxThreadPoolSize,
+ idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
+ LOG.info("Set NMClientAsync thread pool size to " +
+ newThreadPoolSize + " as the number of nodes to talk to is "
+ + nodeNum);
+ threadPool.setCorePoolSize(newThreadPoolSize);
+ }
+ }
+
+ // the events from the queue are handled in parallel with a thread
+ // pool
+ threadPool.execute(getContainerEventProcessor(event));
+
+ // TODO: Group launching of multiple containers to a single
+ // NodeManager into a single connection
+ }
+ }
+ };
+ eventDispatcherThread.setName("Container Event Dispatcher");
+ eventDispatcherThread.setDaemon(false);
+ eventDispatcherThread.start();
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
+ eventDispatcherThread.interrupt();
+ try {
+ eventDispatcherThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("The thread of " + eventDispatcherThread.getName() +
+ " didn't finish normally.", e);
+ }
+ threadPool.shutdownNow();
+ // If NMClientImpl doesn't stop running containers, the states doesn't
+ // need to be cleared.
+ if (!(client instanceof NMClientImpl) ||
+ ((NMClientImpl) client).cleanupRunningContainers.get()) {
+ containers.clear();
+ }
+ client.stop();
+ super.stop();
+ }
+
+ public void startContainer(
+ Container container, ContainerLaunchContext containerLaunchContext) {
+ if (containers.putIfAbsent(container.getId(),
+ new StatefulContainer(this, container.getId())) != null) {
+ callbackHandler.onStartContainerError(container.getId(),
+ RPCUtil.getRemoteException("Container " + container.getId() +
+ " is already started or scheduled to start"));
+ }
+ try {
+ events.put(new StartContainerEvent(container, containerLaunchContext));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of starting Container " +
+ container.getId());
+ callbackHandler.onStartContainerError(container.getId(), e);
+ }
+ }
+
+ public void stopContainer(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) {
+ if (containers.get(containerId) == null) {
+ callbackHandler.onStopContainerError(containerId,
+ RPCUtil.getRemoteException("Container " + containerId +
+ " is neither started nor scheduled to start"));
+ }
+ try {
+ events.put(new ContainerEvent(containerId, nodeId, containerToken,
+ ContainerEventType.STOP_CONTAINER));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of stopping Container " +
+ containerId);
+ callbackHandler.onStopContainerError(containerId, e);
+ }
+ }
+
+ public void getContainerStatus(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) {
+ try {
+ events.put(new ContainerEvent(containerId, nodeId, containerToken,
+ ContainerEventType.QUERY_CONTAINER));
+ } catch (InterruptedException e) {
+ LOG.warn("Exception when scheduling the event of querying the status" +
+ " of Container " + containerId);
+ callbackHandler.onGetContainerStatusError(containerId, e);
+ }
+ }
+
+ protected static enum ContainerState {
+ PREP, FAILED, RUNNING, DONE,
+ }
+
+ protected boolean isCompletelyDone(StatefulContainer container) {
+ return container.getState() == ContainerState.DONE ||
+ container.getState() == ContainerState.FAILED;
+ }
+
+ protected ContainerEventProcessor getContainerEventProcessor(
+ ContainerEvent event) {
+ return new ContainerEventProcessor(event);
+ }
+
+ /**
+ * The type of the event of interacting with a container
+ */
+ protected static enum ContainerEventType {
+ START_CONTAINER,
+ STOP_CONTAINER,
+ QUERY_CONTAINER
+ }
+
+ protected static class ContainerEvent
+ extends AbstractEvent{
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private ContainerToken containerToken;
+
+ public ContainerEvent(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken, ContainerEventType type) {
+ super(type);
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public ContainerToken getContainerToken() {
+ return containerToken;
+ }
+ }
+
+ protected static class StartContainerEvent extends ContainerEvent {
+ private Container container;
+ private ContainerLaunchContext containerLaunchContext;
+
+ public StartContainerEvent(Container container,
+ ContainerLaunchContext containerLaunchContext) {
+ super(container.getId(), container.getNodeId(),
+ container.getContainerToken(), ContainerEventType.START_CONTAINER);
+ this.container = container;
+ this.containerLaunchContext = containerLaunchContext;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return containerLaunchContext;
+ }
+ }
+
+ protected static class StatefulContainer implements
+ EventHandler {
+
+ protected final static StateMachineFactory stateMachineFactory
+ = new StateMachineFactory(ContainerState.PREP)
+
+ // Transitions from PREP state
+ .addTransition(ContainerState.PREP,
+ EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+ ContainerEventType.START_CONTAINER,
+ new StartContainerTransition())
+ .addTransition(ContainerState.PREP, ContainerState.DONE,
+ ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
+
+ // Transitions from RUNNING state
+ // RUNNING -> RUNNING should be the invalid transition
+ .addTransition(ContainerState.RUNNING,
+ EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
+ ContainerEventType.STOP_CONTAINER,
+ new StopContainerTransition())
+
+ // Transition from DONE state
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ EnumSet.of(ContainerEventType.START_CONTAINER,
+ ContainerEventType.STOP_CONTAINER))
+
+ // Transition from FAILED state
+ .addTransition(ContainerState.FAILED, ContainerState.FAILED,
+ EnumSet.of(ContainerEventType.START_CONTAINER,
+ ContainerEventType.STOP_CONTAINER));
+
+ protected static class StartContainerTransition implements
+ MultipleArcTransition {
+
+ @Override
+ public ContainerState transition(
+ StatefulContainer container, ContainerEvent event) {
+ ContainerId containerId = event.getContainerId();
+ try {
+ StartContainerEvent scEvent = null;
+ if (event instanceof StartContainerEvent) {
+ scEvent = (StartContainerEvent) event;
+ }
+ assert scEvent != null;
+ Map allServiceResponse =
+ container.nmClientAsync.client.startContainer(
+ scEvent.getContainer(), scEvent.getContainerLaunchContext());
+ try {
+ container.nmClientAsync.callbackHandler.onContainerStarted(
+ containerId, allServiceResponse);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onContainerStarted for "
+ + "Container " + containerId, thr);
+ }
+ return ContainerState.RUNNING;
+ } catch (YarnRemoteException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (IOException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (Throwable t) {
+ return onExceptionRaised(container, event, t);
+ }
+ }
+
+ private ContainerState onExceptionRaised(StatefulContainer container,
+ ContainerEvent event, Throwable t) {
+ try {
+ container.nmClientAsync.callbackHandler.onStartContainerError(
+ event.getContainerId(), t);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info(
+ "Unchecked exception is thrown from onStartContainerError for " +
+ "Container " + event.getContainerId(), thr);
+ }
+ return ContainerState.FAILED;
+ }
+ }
+
+ protected static class StopContainerTransition implements
+ MultipleArcTransition {
+
+ @Override
+ public ContainerState transition(
+ StatefulContainer container, ContainerEvent event) {
+ ContainerId containerId = event.getContainerId();
+ try {
+ container.nmClientAsync.client.stopContainer(
+ containerId, event.getNodeId(), event.getContainerToken());
+ try {
+ container.nmClientAsync.callbackHandler.onContainerStopped(
+ event.getContainerId());
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onContainerStopped for "
+ + "Container " + event.getContainerId(), thr);
+ }
+ return ContainerState.DONE;
+ } catch (YarnRemoteException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (IOException e) {
+ return onExceptionRaised(container, event, e);
+ } catch (Throwable t) {
+ return onExceptionRaised(container, event, t);
+ }
+ }
+
+ private ContainerState onExceptionRaised(StatefulContainer container,
+ ContainerEvent event, Throwable t) {
+ try {
+ container.nmClientAsync.callbackHandler.onStopContainerError(
+ event.getContainerId(), t);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onStopContainerError for "
+ + "Container " + event.getContainerId(), thr);
+ }
+ return ContainerState.FAILED;
+ }
+ }
+
+ protected static class OutOfOrderTransition implements
+ SingleArcTransition {
+
+ protected static final String STOP_BEFORE_START_ERROR_MSG =
+ "Container was killed before it was launched";
+
+ @Override
+ public void transition(StatefulContainer container, ContainerEvent event) {
+ try {
+ container.nmClientAsync.callbackHandler.onStartContainerError(
+ event.getContainerId(),
+ RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info(
+ "Unchecked exception is thrown from onStartContainerError for " +
+ "Container " + event.getContainerId(), thr);
+ }
+ }
+ }
+
+ private final NMClientAsync nmClientAsync;
+ private final ContainerId containerId;
+ private final StateMachine stateMachine;
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+
+ public StatefulContainer(NMClientAsync client, ContainerId containerId) {
+ this.nmClientAsync = client;
+ this.containerId = containerId;
+ stateMachine = stateMachineFactory.make(this);
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ @Override
+ public void handle(ContainerEvent event) {
+ writeLock.lock();
+ try {
+ try {
+ this.stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public ContainerState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+ }
+
+ protected class ContainerEventProcessor implements Runnable {
+ protected ContainerEvent event;
+
+ public ContainerEventProcessor(ContainerEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ ContainerId containerId = event.getContainerId();
+ LOG.info("Processing Event " + event + " for Container " + containerId);
+ if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
+ try {
+ ContainerStatus containerStatus = client.getContainerStatus(
+ containerId, event.getNodeId(), event.getContainerToken());
+ try {
+ callbackHandler.onContainerStatusReceived(
+ containerId, containerStatus);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info(
+ "Unchecked exception is thrown from onContainerStatusReceived" +
+ " for Container " + event.getContainerId(), thr);
+ }
+ } catch (YarnRemoteException e) {
+ onExceptionRaised(containerId, e);
+ } catch (IOException e) {
+ onExceptionRaised(containerId, e);
+ } catch (Throwable t) {
+ onExceptionRaised(containerId, t);
+ }
+ } else {
+ StatefulContainer container = containers.get(containerId);
+ if (container == null) {
+ LOG.info("Container " + containerId + " is already stopped or failed");
+ } else {
+ container.handle(event);
+ }
+ }
+ }
+
+ private void onExceptionRaised(ContainerId containerId, Throwable t) {
+ try {
+ callbackHandler.onGetContainerStatusError(containerId, t);
+ } catch (Throwable thr) {
+ // Don't process user created unchecked exception
+ LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
+ " for Container " + containerId, thr);
+ }
+ }
+ }
+
+ /**
+ *
+ * The callback interface needs to be implemented by {@link NMClientAsync}
+ * users. The APIs are called when responses from NodeManager
are
+ * available.
+ *
+ *
+ *
+ * Once a callback happens, the users can chose to act on it in blocking or
+ * non-blocking manner. If the action on callback is done in a blocking
+ * manner, some of the threads performing requests on NodeManagers may get
+ * blocked depending on how many threads in the pool are busy.
+ *
+ *
+ *
+ * The implementation of the callback function should not throw the
+ * unexpected exception. Otherwise, {@link NMClientAsync} will just
+ * catch, log and then ignore it.
+ *
+ */
+ public static interface CallbackHandler {
+ /**
+ * The API is called when NodeManager
responds to indicate its
+ * acceptance of the starting container request
+ * @param containerId the Id of the container
+ * @param allServiceResponse a Map between the auxiliary service names and
+ * their outputs
+ */
+ void onContainerStarted(ContainerId containerId,
+ Map allServiceResponse);
+
+ /**
+ * The API is called when NodeManager
responds with the status
+ * of the container
+ * @param containerId the Id of the container
+ * @param containerStatus the status of the container
+ */
+ void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus);
+
+ /**
+ * The API is called when NodeManager
responds to indicate the
+ * container is stopped.
+ * @param containerId the Id of the container
+ */
+ void onContainerStopped(ContainerId containerId);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * starting a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onStartContainerError(ContainerId containerId, Throwable t);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * querying the status of a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onGetContainerStatusError(ContainerId containerId, Throwable t);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * stopping a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onStopContainerError(ContainerId containerId, Throwable t);
+
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
new file mode 100644
index 00000000000..85c313ced6b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ *
+ * This class implements {@link NMClient}. All the APIs are blocking.
+ *
+ *
+ *
+ * By default, this client stops all the running containers that are started by
+ * it when it stops. It can be disabled via
+ * {@link #cleanupRunningContainersOnStop}, in which case containers will
+ * continue to run even after this client is stopped and till the application
+ * runs at which point ResourceManager will forcefully kill them.
+ *
+ */
+public class NMClientImpl extends AbstractService implements NMClient {
+
+ private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
+
+ // The logically coherent operations on startedContainers is synchronized to
+ // ensure they are atomic
+ protected ConcurrentMap startedContainers =
+ new ConcurrentHashMap();
+
+ //enabled by default
+ protected AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+
+ public NMClientImpl() {
+ super(NMClientImpl.class.getName());
+ }
+
+ public NMClientImpl(String name) {
+ super(name);
+ }
+
+ @Override
+ public void stop() {
+ // Usually, started-containers are stopped when this client stops. Unless
+ // the flag cleanupRunningContainers is set to false.
+ if (cleanupRunningContainers.get()) {
+ cleanupRunningContainers();
+ }
+ super.stop();
+ }
+
+ protected synchronized void cleanupRunningContainers() {
+ for (StartedContainer startedContainer : startedContainers.values()) {
+ try {
+ stopContainer(startedContainer.getContainerId(),
+ startedContainer.getNodeId(),
+ startedContainer.getContainerToken());
+ } catch (YarnRemoteException e) {
+ LOG.error("Failed to stop Container " +
+ startedContainer.getContainerId() +
+ "when stopping NMClientImpl");
+ } catch (IOException e) {
+ LOG.error("Failed to stop Container " +
+ startedContainer.getContainerId() +
+ "when stopping NMClientImpl");
+ }
+ }
+ }
+
+ @Override
+ public void cleanupRunningContainersOnStop(boolean enabled) {
+ cleanupRunningContainers.set(enabled);
+ }
+
+ protected static class StartedContainer {
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private ContainerToken containerToken;
+ private boolean stopped;
+
+ public StartedContainer(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) {
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ stopped = false;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public ContainerToken getContainerToken() {
+ return containerToken;
+ }
+ }
+
+ protected static final class NMCommunicator extends AbstractService {
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private ContainerToken containerToken;
+ private ContainerManager containerManager;
+
+ public NMCommunicator(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) {
+ super(NMCommunicator.class.getName());
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ }
+
+ @Override
+ public synchronized void start() {
+ final YarnRPC rpc = YarnRPC.create(getConfig());
+
+ final InetSocketAddress containerAddress =
+ NetUtils.createSocketAddr(nodeId.toString());
+
+ // the user in createRemoteUser in this context has to be ContainerId
+ UserGroupInformation currentUser =
+ UserGroupInformation.createRemoteUser(containerId.toString());
+
+ Token token =
+ ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
+ currentUser.addToken(token);
+
+ containerManager = currentUser
+ .doAs(new PrivilegedAction() {
+ @Override
+ public ContainerManager run() {
+ return (ContainerManager) rpc.getProxy(ContainerManager.class,
+ containerAddress, getConfig());
+ }
+ });
+
+ LOG.debug("Connecting to ContainerManager at " + containerAddress);
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (this.containerManager != null) {
+ RPC.stopProxy(this.containerManager);
+
+ if (LOG.isDebugEnabled()) {
+ InetSocketAddress containerAddress =
+ NetUtils.createSocketAddr(nodeId.toString());
+ LOG.debug("Disconnecting from ContainerManager at " +
+ containerAddress);
+ }
+ }
+ }
+
+ public synchronized Map startContainer(
+ Container container, ContainerLaunchContext containerLaunchContext)
+ throws YarnRemoteException, IOException {
+ if (!container.getId().equals(containerId)) {
+ throw new IllegalArgumentException(
+ "NMCommunicator's containerId mismatches the given Container's");
+ }
+ StartContainerResponse startResponse = null;
+ try {
+ StartContainerRequest startRequest =
+ Records.newRecord(StartContainerRequest.class);
+ startRequest.setContainer(container);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ startResponse = containerManager.startContainer(startRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Started Container " + containerId);
+ }
+ } catch (YarnRemoteException e) {
+ LOG.warn("Container " + containerId + " failed to start", e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Container " + containerId + " failed to start", e);
+ throw e;
+ }
+ return startResponse.getAllServiceResponse();
+ }
+
+ public synchronized void stopContainer() throws YarnRemoteException,
+ IOException {
+ try {
+ StopContainerRequest stopRequest =
+ Records.newRecord(StopContainerRequest.class);
+ stopRequest.setContainerId(containerId);
+ containerManager.stopContainer(stopRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopped Container " + containerId);
+ }
+ } catch (YarnRemoteException e) {
+ LOG.warn("Container " + containerId + " failed to stop", e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Container " + containerId + " failed to stop", e);
+ throw e;
+ }
+ }
+
+ public synchronized ContainerStatus getContainerStatus()
+ throws YarnRemoteException, IOException {
+ GetContainerStatusResponse statusResponse = null;
+ try {
+ GetContainerStatusRequest statusRequest =
+ Records.newRecord(GetContainerStatusRequest.class);
+ statusRequest.setContainerId(containerId);
+ statusResponse = containerManager.getContainerStatus(statusRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got the status of Container " + containerId);
+ }
+ } catch (YarnRemoteException e) {
+ LOG.warn(
+ "Unable to get the status of Container " + containerId, e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to get the status of Container " + containerId, e);
+ throw e;
+ }
+ return statusResponse.getStatus();
+ }
+ }
+
+ @Override
+ public Map startContainer(
+ Container container, ContainerLaunchContext containerLaunchContext)
+ throws YarnRemoteException, IOException {
+ // Do synchronization on StartedContainer to prevent race condition
+ // between startContainer and stopContainer
+ synchronized (addStartedContainer(container)) {
+ Map allServiceResponse;
+ NMCommunicator nmCommunicator = null;
+ try {
+ nmCommunicator = new NMCommunicator(container.getId(),
+ container.getNodeId(), container.getContainerToken());
+ nmCommunicator.init(getConfig());
+ nmCommunicator.start();
+ allServiceResponse =
+ nmCommunicator.startContainer(container, containerLaunchContext);
+ } catch (YarnRemoteException e) {
+ // Remove the started container if it failed to start
+ removeStartedContainer(container.getId());
+ throw e;
+ } catch (IOException e) {
+ removeStartedContainer(container.getId());
+ throw e;
+ } catch (Throwable t) {
+ removeStartedContainer(container.getId());
+ throw RPCUtil.getRemoteException(t);
+ } finally {
+ if (nmCommunicator != null) {
+ nmCommunicator.stop();
+ }
+ }
+ return allServiceResponse;
+ }
+
+ // Three choices:
+ // 1. starting and releasing the proxy before and after each interaction
+ // 2. starting the proxy when starting the container and releasing it when
+ // stopping the container
+ // 3. starting the proxy when starting the container and releasing it when
+ // stopping the client
+ // Adopt 1 currently
+ }
+
+ @Override
+ public void stopContainer(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) throws YarnRemoteException, IOException {
+ StartedContainer startedContainer = getStartedContainer(containerId);
+ if (startedContainer == null) {
+ throw RPCUtil.getRemoteException("Container " + containerId +
+ " is either not started yet or already stopped");
+ }
+ // Only allow one request of stopping the container to move forward
+ // When entering the block, check whether the precursor has already stopped
+ // the container
+ synchronized (startedContainer) {
+ if (startedContainer.stopped) {
+ return;
+ }
+ NMCommunicator nmCommunicator = null;
+ try {
+ nmCommunicator =
+ new NMCommunicator(containerId, nodeId, containerToken);
+ nmCommunicator.init(getConfig());
+ nmCommunicator.start();
+ nmCommunicator.stopContainer();
+ } finally {
+ if (nmCommunicator != null) {
+ nmCommunicator.stop();
+ }
+ startedContainer.stopped = true;
+ removeStartedContainer(containerId);
+ }
+ }
+ }
+
+ @Override
+ public ContainerStatus getContainerStatus(ContainerId containerId,
+ NodeId nodeId, ContainerToken containerToken)
+ throws YarnRemoteException, IOException {
+ NMCommunicator nmCommunicator = null;
+ try {
+ nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
+ nmCommunicator.init(getConfig());
+ nmCommunicator.start();
+ ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+ return containerStatus;
+ } finally {
+ if (nmCommunicator != null) {
+ nmCommunicator.stop();
+ }
+ }
+ }
+
+ protected synchronized StartedContainer addStartedContainer(
+ Container container) throws YarnRemoteException, IOException {
+ if (startedContainers.containsKey(container.getId())) {
+ throw RPCUtil.getRemoteException("Container " + container.getId() +
+ " is already started");
+ }
+ StartedContainer startedContainer = new StartedContainer(container.getId(),
+ container.getNodeId(), container.getContainerToken());
+ startedContainers.put(startedContainer.getContainerId(), startedContainer);
+ return startedContainer;
+ }
+
+ protected synchronized void removeStartedContainer(ContainerId containerId) {
+ startedContainers.remove(containerId);
+ }
+
+ protected synchronized StartedContainer getStartedContainer(
+ ContainerId containerId) {
+ return startedContainers.get(containerId);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
new file mode 100644
index 00000000000..3b4439ea280
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMClient {
+ Configuration conf = null;
+ MiniYARNCluster yarnCluster = null;
+ YarnClientImpl yarnClient = null;
+ AMRMClientImpl rmClient = null;
+ NMClientImpl nmClient = null;
+ List nodeReports = null;
+ ApplicationAttemptId attemptId = null;
+ int nodeCount = 3;
+
+ @Before
+ public void setup() throws YarnRemoteException, IOException {
+ // start minicluster
+ conf = new YarnConfiguration();
+ yarnCluster =
+ new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+ assertNotNull(yarnCluster);
+ assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+
+ // start rm client
+ yarnClient = new YarnClientImpl();
+ yarnClient.init(conf);
+ yarnClient.start();
+ assertNotNull(yarnClient);
+ assertEquals(STATE.STARTED, yarnClient.getServiceState());
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports();
+
+ // submit new app
+ GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Priority.newInstance(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest = Records
+ .newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ int iterationsLeft = 30;
+ while (iterationsLeft > 0) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ break;
+ }
+ sleep(1000);
+ --iterationsLeft;
+ }
+ if (iterationsLeft == 0) {
+ fail("Application hasn't bee started");
+ }
+
+ // start am rm client
+ rmClient = new AMRMClientImpl(attemptId);
+ rmClient.init(conf);
+ rmClient.start();
+ assertNotNull(rmClient);
+ assertEquals(STATE.STARTED, rmClient.getServiceState());
+
+ // start am nm client
+ nmClient = new NMClientImpl();
+ nmClient.init(conf);
+ nmClient.start();
+ assertNotNull(nmClient);
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
+ }
+
+ @After
+ public void tearDown() {
+ rmClient.stop();
+
+ // leave one unclosed
+ assertEquals(1, nmClient.startedContainers.size());
+ // default true
+ assertTrue(nmClient.cleanupRunningContainers.get());
+ // don't stop the running containers
+ nmClient.cleanupRunningContainersOnStop(false);
+ assertFalse(nmClient.cleanupRunningContainers.get());
+ nmClient.stop();
+ assertTrue(nmClient.startedContainers.size() > 0);
+ // stop the running containers
+ nmClient.cleanupRunningContainersOnStop(true);
+ assertTrue(nmClient.cleanupRunningContainers.get());
+ nmClient.stop();
+ assertEquals(0, nmClient.startedContainers.size());
+
+ yarnClient.stop();
+ yarnCluster.stop();
+ }
+
+ @Test (timeout = 60000)
+ public void testNMClient()
+ throws YarnRemoteException, IOException {
+
+ rmClient.registerApplicationMaster("Host", 10000, "");
+
+ testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+ }
+
+ private Set allocateContainers(AMRMClientImpl rmClient, int num)
+ throws YarnRemoteException, IOException {
+ // setup container request
+ Resource capability = Resource.newInstance(1024, 0);
+ Priority priority = Priority.newInstance(0);
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String rack = nodeReports.get(0).getRackName();
+ String[] nodes = new String[] {node};
+ String[] racks = new String[] {rack};
+
+ for (int i = 0; i < num; ++i) {
+ rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
+ racks, priority, 1));
+ }
+
+ int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
+ .get(ResourceRequest.ANY).get(capability).getNumContainers();
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 2;
+ Set containers = new TreeSet();
+ while (allocatedContainerCount < containersRequestedAny
+ && iterationsLeft > 0) {
+ AllocateResponse allocResponse = rmClient.allocate(0.1f);
+
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+ for(Container container : allocResponse.getAllocatedContainers()) {
+ containers.add(container);
+ }
+ if(allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(1000);
+ }
+
+ --iterationsLeft;
+ }
+ return containers;
+ }
+
+ private void testContainerManagement(NMClientImpl nmClient,
+ Set containers) throws IOException {
+ int size = containers.size();
+ int i = 0;
+ for (Container container : containers) {
+ // getContainerStatus shouldn't be called before startContainer,
+ // otherwise, NodeManager cannot find the container
+ try {
+ nmClient.getContainerStatus(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ fail("Exception is expected");
+ } catch (YarnRemoteException e) {
+ assertTrue("The thrown exception is not expected",
+ e.getMessage().contains("is not handled by this NodeManager"));
+ }
+
+ // stopContainer shouldn't be called before startContainer,
+ // otherwise, an exception will be thrown
+ try {
+ nmClient.stopContainer(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ fail("Exception is expected");
+ } catch (YarnRemoteException e) {
+ assertTrue("The thrown exception is not expected",
+ e.getMessage().contains(
+ "is either not started yet or already stopped"));
+ }
+
+ Credentials ts = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ContainerLaunchContext clc =
+ Records.newRecord(ContainerLaunchContext.class);
+ clc.setTokens(securityTokens);
+ try {
+ nmClient.startContainer(container, clc);
+ } catch (YarnRemoteException e) {
+ fail("Exception is not expected");
+ }
+
+ // leave one container unclosed
+ if (++i < size) {
+ try {
+ ContainerStatus status = nmClient.getContainerStatus(container.getId(),
+ container.getNodeId(), container.getContainerToken());
+ // verify the container is started and in good shape
+ assertEquals(container.getId(), status.getContainerId());
+ assertEquals(ContainerState.RUNNING, status.getState());
+ assertEquals("", status.getDiagnostics());
+ assertEquals(-1000, status.getExitStatus());
+ } catch (YarnRemoteException e) {
+ fail("Exception is not expected");
+ }
+
+ try {
+ nmClient.stopContainer(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ } catch (YarnRemoteException e) {
+ fail("Exception is not expected");
+ }
+
+ // getContainerStatus can be called after stopContainer
+ try {
+ ContainerStatus status = nmClient.getContainerStatus(
+ container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ assertEquals(container.getId(), status.getContainerId());
+ assertEquals(ContainerState.RUNNING, status.getState());
+ assertTrue("" + i, status.getDiagnostics().contains(
+ "Container killed by the ApplicationMaster."));
+ assertEquals(-1000, status.getExitStatus());
+ } catch (YarnRemoteException e) {
+ fail("Exception is not expected");
+ }
+ }
+ }
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
new file mode 100644
index 00000000000..5813ad0b997
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+
+public class TestNMClientAsync {
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private NMClientAsync asyncClient;
+ private NodeId nodeId;
+ private ContainerToken containerToken;
+
+ @Test (timeout = 30000)
+ public void testNMClientAsync() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
+
+ // Threads to run are more than the max size of the thread pool
+ int expectedSuccess = 40;
+ int expectedFailure = 40;
+
+ asyncClient = new MockNMClientAsync1(expectedSuccess, expectedFailure);
+ asyncClient.init(conf);
+ Assert.assertEquals("The max thread pool size is not correctly set",
+ 10, asyncClient.maxThreadPoolSize);
+ asyncClient.start();
+
+
+ for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
+ if (i == expectedSuccess) {
+ while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+ .isAllSuccessCallsExecuted()) {
+ Thread.sleep(10);
+ }
+ asyncClient.client = mockNMClient(1);
+ }
+ Container container = mockContainer(i);
+ ContainerLaunchContext clc =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ asyncClient.startContainer(container, clc);
+ }
+ while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+ .isStartAndQueryFailureCallsExecuted()) {
+ Thread.sleep(10);
+ }
+ asyncClient.client = mockNMClient(2);
+ ((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
+ for (int i = 0; i < expectedFailure; ++i) {
+ Container container = mockContainer(
+ expectedSuccess + expectedFailure + i);
+ ContainerLaunchContext clc =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ asyncClient.startContainer(container, clc);
+ }
+ while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+ .isStopFailureCallsExecuted()) {
+ Thread.sleep(10);
+ }
+ for (String errorMsg :
+ ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
+ System.out.println(errorMsg);
+ }
+ Assert.assertEquals("Error occurs in CallbackHandler", 0,
+ ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
+ for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
+ System.out.println(errorMsg);
+ }
+ Assert.assertEquals("Error occurs in ContainerEventProcessor", 0,
+ ((MockNMClientAsync1) asyncClient).errorMsgs.size());
+ asyncClient.stop();
+ Assert.assertFalse(
+ "The thread of Container Management Event Dispatcher is still alive",
+ asyncClient.eventDispatcherThread.isAlive());
+ Assert.assertTrue("The thread pool is not shut down",
+ asyncClient.threadPool.isShutdown());
+ }
+
+ private class MockNMClientAsync1 extends NMClientAsync {
+ private Set errorMsgs =
+ Collections.synchronizedSet(new HashSet());
+
+ protected MockNMClientAsync1(int expectedSuccess, int expectedFailure)
+ throws YarnRemoteException, IOException {
+ super(MockNMClientAsync1.class.getName(), mockNMClient(0),
+ new TestCallbackHandler1(expectedSuccess, expectedFailure));
+ }
+
+ private class MockContainerEventProcessor extends ContainerEventProcessor {
+ public MockContainerEventProcessor(ContainerEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } catch (RuntimeException e) {
+ // If the unexpected throwable comes from error callback functions, it
+ // will break ContainerEventProcessor.run(). Therefore, monitor
+ // the exception here
+ errorMsgs.add("Unexpected throwable from callback functions should" +
+ " be ignored by Container " + event.getContainerId());
+ }
+ }
+ }
+
+ @Override
+ protected ContainerEventProcessor getContainerEventProcessor(
+ ContainerEvent event) {
+ return new MockContainerEventProcessor(event);
+ }
+ }
+
+ private class TestCallbackHandler1
+ implements NMClientAsync.CallbackHandler {
+
+ private boolean path = true;
+
+ private int expectedSuccess;
+ private int expectedFailure;
+
+ private AtomicInteger actualStartSuccess = new AtomicInteger(0);
+ private AtomicInteger actualStartFailure = new AtomicInteger(0);
+ private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
+ private AtomicInteger actualQueryFailure = new AtomicInteger(0);
+ private AtomicInteger actualStopSuccess = new AtomicInteger(0);
+ private AtomicInteger actualStopFailure = new AtomicInteger(0);
+
+ private AtomicIntegerArray actualStartSuccessArray;
+ private AtomicIntegerArray actualStartFailureArray;
+ private AtomicIntegerArray actualQuerySuccessArray;
+ private AtomicIntegerArray actualQueryFailureArray;
+ private AtomicIntegerArray actualStopSuccessArray;
+ private AtomicIntegerArray actualStopFailureArray;
+
+ private Set errorMsgs =
+ Collections.synchronizedSet(new HashSet());
+
+ public TestCallbackHandler1(int expectedSuccess, int expectedFailure) {
+ this.expectedSuccess = expectedSuccess;
+ this.expectedFailure = expectedFailure;
+
+ actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
+ actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
+ actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
+ actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
+ actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
+ actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map allServiceResponse) {
+ if (path) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerStarted");
+ return;
+ }
+ actualStartSuccess.addAndGet(1);
+ actualStartSuccessArray.set(containerId.getId(), 1);
+
+ // move on to the following success tests
+ asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+ } else {
+ // move on to the following failure tests
+ asyncClient.stopContainer(containerId, nodeId, containerToken);
+ }
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerStatusReceived");
+ return;
+ }
+ actualQuerySuccess.addAndGet(1);
+ actualQuerySuccessArray.set(containerId.getId(), 1);
+ // move on to the following success tests
+ asyncClient.stopContainer(containerId, nodeId, containerToken);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ if (containerId.getId() >= expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " should throw the exception onContainerStopped");
+ return;
+ }
+ actualStopSuccess.addAndGet(1);
+ actualStopSuccessArray.set(containerId.getId(), 1);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ // If the unexpected throwable comes from success callback functions, it
+ // will be handled by the error callback functions. Therefore, monitor
+ // the exception here
+ if (t instanceof RuntimeException) {
+ errorMsgs.add("Unexpected throwable from callback functions should be" +
+ " ignored by Container " + containerId);
+ }
+ if (containerId.getId() < expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onStartContainerError");
+ return;
+ }
+ actualStartFailure.addAndGet(1);
+ actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
+ // move on to the following failure tests
+ asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ if (t instanceof RuntimeException) {
+ errorMsgs.add("Unexpected throwable from callback functions should be" +
+ " ignored by Container " + containerId);
+ }
+ if (containerId.getId() < expectedSuccess + expectedFailure) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onStopContainerError");
+ return;
+ }
+
+ actualStopFailure.addAndGet(1);
+ actualStopFailureArray.set(
+ containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId,
+ Throwable t) {
+ if (t instanceof RuntimeException) {
+ errorMsgs.add("Unexpected throwable from callback functions should be"
+ + " ignored by Container " + containerId);
+ }
+ if (containerId.getId() < expectedSuccess) {
+ errorMsgs.add("Container " + containerId +
+ " shouldn't throw the exception onGetContainerStatusError");
+ return;
+ }
+ actualQueryFailure.addAndGet(1);
+ actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
+
+ // Shouldn't crash the test thread
+ throw new RuntimeException("Ignorable Exception");
+ }
+
+ public boolean isAllSuccessCallsExecuted() {
+ boolean isAllSuccessCallsExecuted =
+ actualStartSuccess.get() == expectedSuccess &&
+ actualQuerySuccess.get() == expectedSuccess &&
+ actualStopSuccess.get() == expectedSuccess;
+ if (isAllSuccessCallsExecuted) {
+ assertAtomicIntegerArray(actualStartSuccessArray);
+ assertAtomicIntegerArray(actualQuerySuccessArray);
+ assertAtomicIntegerArray(actualStopSuccessArray);
+ }
+ return isAllSuccessCallsExecuted;
+ }
+
+ public boolean isStartAndQueryFailureCallsExecuted() {
+ boolean isStartAndQueryFailureCallsExecuted =
+ actualStartFailure.get() == expectedFailure &&
+ actualQueryFailure.get() == expectedFailure;
+ if (isStartAndQueryFailureCallsExecuted) {
+ assertAtomicIntegerArray(actualStartFailureArray);
+ assertAtomicIntegerArray(actualQueryFailureArray);
+ }
+ return isStartAndQueryFailureCallsExecuted;
+ }
+
+ public boolean isStopFailureCallsExecuted() {
+ boolean isStopFailureCallsExecuted =
+ actualStopFailure.get() == expectedFailure;
+ if (isStopFailureCallsExecuted) {
+ assertAtomicIntegerArray(actualStopFailureArray);
+ }
+ return isStopFailureCallsExecuted;
+ }
+
+ private void assertAtomicIntegerArray(AtomicIntegerArray array) {
+ for (int i = 0; i < array.length(); ++i) {
+ Assert.assertEquals(1, array.get(i));
+ }
+ }
+ }
+
+ private NMClient mockNMClient(int mode)
+ throws YarnRemoteException, IOException {
+ NMClient client = mock(NMClient.class);
+ switch (mode) {
+ case 0:
+ when(client.startContainer(any(Container.class),
+ any(ContainerLaunchContext.class))).thenReturn(
+ Collections.emptyMap());
+ when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+ any(ContainerToken.class))).thenReturn(
+ recordFactory.newRecordInstance(ContainerStatus.class));
+ doNothing().when(client).stopContainer(any(ContainerId.class),
+ any(NodeId.class), any(ContainerToken.class));
+ break;
+ case 1:
+ doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
+ .startContainer(any(Container.class),
+ any(ContainerLaunchContext.class));
+ doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
+ .getContainerStatus(any(ContainerId.class), any(NodeId.class),
+ any(ContainerToken.class));
+ doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+ .stopContainer(any(ContainerId.class), any(NodeId.class),
+ any(ContainerToken.class));
+ break;
+ case 2:
+ when(client.startContainer(any(Container.class),
+ any(ContainerLaunchContext.class))).thenReturn(
+ Collections.emptyMap());
+ when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+ any(ContainerToken.class))).thenReturn(
+ recordFactory.newRecordInstance(ContainerStatus.class));
+ doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+ .stopContainer(any(ContainerId.class), any(NodeId.class),
+ any(ContainerToken.class));
+ }
+ return client;
+ }
+
+ @Test (timeout = 10000)
+ public void testOutOfOrder() throws Exception {
+ CyclicBarrier barrierA = new CyclicBarrier(2);
+ CyclicBarrier barrierB = new CyclicBarrier(2);
+ CyclicBarrier barrierC = new CyclicBarrier(2);
+ asyncClient = new MockNMClientAsync2(barrierA, barrierB, barrierC);
+ asyncClient.init(new Configuration());
+ asyncClient.start();
+
+ final Container container = mockContainer(1);
+ final ContainerLaunchContext clc =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ // start container from another thread
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ asyncClient.startContainer(container, clc);
+ }
+ };
+ t.start();
+
+ barrierA.await();
+ asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ container.getContainerToken());
+ barrierC.await();
+
+ Assert.assertFalse("Starting and stopping should be out of order",
+ ((TestCallbackHandler2) asyncClient.callbackHandler)
+ .exceptionOccurred.get());
+ }
+
+ private class MockNMClientAsync2 extends NMClientAsync {
+ private CyclicBarrier barrierA;
+ private CyclicBarrier barrierB;
+
+ protected MockNMClientAsync2(CyclicBarrier barrierA, CyclicBarrier barrierB,
+ CyclicBarrier barrierC) throws YarnRemoteException, IOException {
+ super(MockNMClientAsync2.class.getName(), mockNMClient(0),
+ new TestCallbackHandler2(barrierC));
+ this.barrierA = barrierA;
+ this.barrierB = barrierB;
+ }
+
+ private class MockContainerEventProcessor extends ContainerEventProcessor {
+
+ public MockContainerEventProcessor(ContainerEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (event.getType() == ContainerEventType.START_CONTAINER) {
+ barrierA.await();
+ barrierB.await();
+ }
+ super.run();
+ if (event.getType() == ContainerEventType.STOP_CONTAINER) {
+ barrierB.await();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ protected ContainerEventProcessor getContainerEventProcessor(
+ ContainerEvent event) {
+ return new MockContainerEventProcessor(event);
+ }
+ }
+
+ private class TestCallbackHandler2
+ implements NMClientAsync.CallbackHandler {
+ private CyclicBarrier barrierC;
+ private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
+
+ public TestCallbackHandler2(CyclicBarrier barrierC) {
+ this.barrierC = barrierC;
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId,
+ Map allServiceResponse) {
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ if (!t.getMessage().equals(NMClientAsync.StatefulContainer
+ .OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
+ exceptionOccurred.set(true);
+ return;
+ }
+ try {
+ barrierC.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId,
+ Throwable t) {
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ }
+
+ }
+
+ private Container mockContainer(int i) {
+ ApplicationId appId =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(attemptId, i);
+ nodeId = NodeId.newInstance("localhost", 0);
+ // Create an empty record
+ containerToken = recordFactory.newRecordInstance(ContainerToken.class);
+ return BuilderUtils.newContainer(
+ containerId, nodeId, null, null, null, containerToken, 0);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 599f8a9edd5..d71d193e536 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -661,6 +661,13 @@
30
+
+ Max number of threads in NMClientAsync to process container
+ management events
+ yarn.client.nodemanager-client-async.thread-pool-max-size
+ 500
+
+
yarn.nodemanager.aux-services.mapreduce.shuffle.class