YARN-422. Add a NM Client library to help application-writers. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1487184 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1487187 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-29 01:43:01 +00:00
parent 71cc411340
commit 604fe963fd
8 changed files with 2068 additions and 0 deletions

View File

@ -67,6 +67,9 @@ Release 2.0.5-beta - UNRELEASED
Azure environments. (See breakdown of tasks below for subtasks and Azure environments. (See breakdown of tasks below for subtasks and
contributors) contributors)
YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event YARN-365. Change NM heartbeat handling to not generate a scheduler event

View File

@ -708,6 +708,14 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS = public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
1000; 1000;
/**
* Max number of threads in NMClientAsync to process container management
* events
*/
public static final String NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE =
YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
public YarnConfiguration() { public YarnConfiguration() {
super(); super();
} }

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.Service;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface NMClient extends Service {
/**
* <p>Start an allocated container.</p>
*
* <p>The <code>ApplicationMaster</code> or other applications that use the
* client must provide the details of the allocated container, including the
* Id, the assigned node's Id and the token via {@link Container}. In
* addition, the AM needs to provide the {@link ContainerLaunchContext} as
* well.</p>
*
* @param container the allocated container
* @param containerLaunchContext the context information needed by the
* <code>NodeManager</code> to launch the
* container
* @return a map between the auxiliary service names and their outputs
* @throws YarnRemoteException
* @throws IOException
*/
Map<String, ByteBuffer> startContainer(Container container,
ContainerLaunchContext containerLaunchContext)
throws YarnRemoteException, IOException;
/**
* <p>Stop an started container.</p>
*
* @param containerId the Id of the started container
* @param nodeId the Id of the <code>NodeManager</code>
* @param containerToken the security token to verify authenticity of the
* started container
* @throws YarnRemoteException
* @throws IOException
*/
void stopContainer(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) throws YarnRemoteException, IOException;
/**
* <p>Query the status of a container.</p>
*
* @param containerId the Id of the started container
* @param nodeId the Id of the <code>NodeManager</code>
* @param containerToken the security token to verify authenticity of the
* started container
* @return the status of a container
* @throws YarnRemoteException
* @throws IOException
*/
ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) throws YarnRemoteException, IOException;
/**
* <p>Set whether the containers that are started by this client, and are
* still running should be stopped when the client stops. By default, the
* feature should be enabled.</p>
*
* @param enabled whether the feature is enabled or not
*/
void cleanupRunningContainersOnStop(boolean enabled);
}

View File

@ -0,0 +1,709 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* <code>NMClientAsync</code> handles communication with all the NodeManagers
* and provides asynchronous updates on getting responses from them. It
* maintains a thread pool to communicate with individual NMs where a number of
* worker threads process requests to NMs by using {@link NMClientImpl}. The max
* size of the thread pool is configurable through
* {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
*
* It should be used in conjunction with a CallbackHandler. For example
*
* <pre>
* {@code
* class MyCallbackHandler implements NMClientAsync.CallbackHandler {
* public void onContainerStarted(ContainerId containerId,
* Map<String, ByteBuffer> allServiceResponse) {
* [post process after the container is started, process the response]
* }
*
* public void onContainerStatusReceived(ContainerId containerId,
* ContainerStatus containerStatus) {
* [make use of the status of the container]
* }
*
* public void onContainerStopped(ContainerId containerId) {
* [post process after the container is stopped]
* }
*
* public void onStartContainerError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
*
* public void onGetContainerStatusError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
*
* public void onStopContainerError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
* }
* }
* </pre>
*
* The client's life-cycle should be managed like the following:
*
* <pre>
* {@code
* NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* asyncClient.startContainer(container, containerLaunchContext);
* [... wait for container being started]
* asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
* container.getContainerToken());
* [... handle the status in the callback instance]
* asyncClient.stopContainer(container.getId(), container.getNodeId(),
* container.getContainerToken());
* [... wait for container being stopped]
* asyncClient.stop();
* }
* </pre>
*/
@Unstable
@Evolving
public class NMClientAsync extends AbstractService {
private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
protected static final int INITIAL_THREAD_POOL_SIZE = 10;
protected ThreadPoolExecutor threadPool;
protected int maxThreadPoolSize;
protected Thread eventDispatcherThread;
protected AtomicBoolean stopped = new AtomicBoolean(false);
protected BlockingQueue<ContainerEvent> events =
new LinkedBlockingQueue<ContainerEvent>();
protected NMClient client;
protected CallbackHandler callbackHandler;
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
new ConcurrentHashMap<ContainerId, StatefulContainer>();
public NMClientAsync(CallbackHandler callbackHandler) {
this (NMClientAsync.class.getName(), callbackHandler);
}
public NMClientAsync(String name, CallbackHandler callbackHandler) {
this (name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
protected NMClientAsync(String name, NMClient client,
CallbackHandler callbackHandler) {
super(name);
this.client = client;
this.callbackHandler = callbackHandler;
}
@Override
public void init(Configuration conf) {
this.maxThreadPoolSize = conf.getInt(
YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
client.init(conf);
super.init(conf);
}
@Override
public void start() {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.start();
}
@Override
public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventDispatcherThread.interrupt();
try {
eventDispatcherThread.join();
} catch (InterruptedException e) {
LOG.error("The thread of " + eventDispatcherThread.getName() +
" didn't finish normally.", e);
}
threadPool.shutdownNow();
// If NMClientImpl doesn't stop running containers, the states doesn't
// need to be cleared.
if (!(client instanceof NMClientImpl) ||
((NMClientImpl) client).cleanupRunningContainers.get()) {
containers.clear();
}
client.stop();
super.stop();
}
public void startContainer(
Container container, ContainerLaunchContext containerLaunchContext) {
if (containers.putIfAbsent(container.getId(),
new StatefulContainer(this, container.getId())) != null) {
callbackHandler.onStartContainerError(container.getId(),
RPCUtil.getRemoteException("Container " + container.getId() +
" is already started or scheduled to start"));
}
try {
events.put(new StartContainerEvent(container, containerLaunchContext));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of starting Container " +
container.getId());
callbackHandler.onStartContainerError(container.getId(), e);
}
}
public void stopContainer(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) {
if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId,
RPCUtil.getRemoteException("Container " + containerId +
" is neither started nor scheduled to start"));
}
try {
events.put(new ContainerEvent(containerId, nodeId, containerToken,
ContainerEventType.STOP_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of stopping Container " +
containerId);
callbackHandler.onStopContainerError(containerId, e);
}
}
public void getContainerStatus(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) {
try {
events.put(new ContainerEvent(containerId, nodeId, containerToken,
ContainerEventType.QUERY_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of querying the status" +
" of Container " + containerId);
callbackHandler.onGetContainerStatusError(containerId, e);
}
}
protected static enum ContainerState {
PREP, FAILED, RUNNING, DONE,
}
protected boolean isCompletelyDone(StatefulContainer container) {
return container.getState() == ContainerState.DONE ||
container.getState() == ContainerState.FAILED;
}
protected ContainerEventProcessor getContainerEventProcessor(
ContainerEvent event) {
return new ContainerEventProcessor(event);
}
/**
* The type of the event of interacting with a container
*/
protected static enum ContainerEventType {
START_CONTAINER,
STOP_CONTAINER,
QUERY_CONTAINER
}
protected static class ContainerEvent
extends AbstractEvent<ContainerEventType>{
private ContainerId containerId;
private NodeId nodeId;
private ContainerToken containerToken;
public ContainerEvent(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken, ContainerEventType type) {
super(type);
this.containerId = containerId;
this.nodeId = nodeId;
this.containerToken = containerToken;
}
public ContainerId getContainerId() {
return containerId;
}
public NodeId getNodeId() {
return nodeId;
}
public ContainerToken getContainerToken() {
return containerToken;
}
}
protected static class StartContainerEvent extends ContainerEvent {
private Container container;
private ContainerLaunchContext containerLaunchContext;
public StartContainerEvent(Container container,
ContainerLaunchContext containerLaunchContext) {
super(container.getId(), container.getNodeId(),
container.getContainerToken(), ContainerEventType.START_CONTAINER);
this.container = container;
this.containerLaunchContext = containerLaunchContext;
}
public Container getContainer() {
return container;
}
public ContainerLaunchContext getContainerLaunchContext() {
return containerLaunchContext;
}
}
protected static class StatefulContainer implements
EventHandler<ContainerEvent> {
protected final static StateMachineFactory<StatefulContainer,
ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory
= new StateMachineFactory<StatefulContainer, ContainerState,
ContainerEventType, ContainerEvent>(ContainerState.PREP)
// Transitions from PREP state
.addTransition(ContainerState.PREP,
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
ContainerEventType.START_CONTAINER,
new StartContainerTransition())
.addTransition(ContainerState.PREP, ContainerState.DONE,
ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
// Transitions from RUNNING state
// RUNNING -> RUNNING should be the invalid transition
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
ContainerEventType.STOP_CONTAINER,
new StopContainerTransition())
// Transition from DONE state
.addTransition(ContainerState.DONE, ContainerState.DONE,
EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER))
// Transition from FAILED state
.addTransition(ContainerState.FAILED, ContainerState.FAILED,
EnumSet.of(ContainerEventType.START_CONTAINER,
ContainerEventType.STOP_CONTAINER));
protected static class StartContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> {
@Override
public ContainerState transition(
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
StartContainerEvent scEvent = null;
if (event instanceof StartContainerEvent) {
scEvent = (StartContainerEvent) event;
}
assert scEvent != null;
Map<String, ByteBuffer> allServiceResponse =
container.nmClientAsync.client.startContainer(
scEvent.getContainer(), scEvent.getContainerLaunchContext());
try {
container.nmClientAsync.callbackHandler.onContainerStarted(
containerId, allServiceResponse);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onContainerStarted for "
+ "Container " + containerId, thr);
}
return ContainerState.RUNNING;
} catch (YarnRemoteException e) {
return onExceptionRaised(container, event, e);
} catch (IOException e) {
return onExceptionRaised(container, event, e);
} catch (Throwable t) {
return onExceptionRaised(container, event, t);
}
}
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
container.nmClientAsync.callbackHandler.onStartContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info(
"Unchecked exception is thrown from onStartContainerError for " +
"Container " + event.getContainerId(), thr);
}
return ContainerState.FAILED;
}
}
protected static class StopContainerTransition implements
MultipleArcTransition<StatefulContainer, ContainerEvent,
ContainerState> {
@Override
public ContainerState transition(
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
container.nmClientAsync.client.stopContainer(
containerId, event.getNodeId(), event.getContainerToken());
try {
container.nmClientAsync.callbackHandler.onContainerStopped(
event.getContainerId());
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onContainerStopped for "
+ "Container " + event.getContainerId(), thr);
}
return ContainerState.DONE;
} catch (YarnRemoteException e) {
return onExceptionRaised(container, event, e);
} catch (IOException e) {
return onExceptionRaised(container, event, e);
} catch (Throwable t) {
return onExceptionRaised(container, event, t);
}
}
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
container.nmClientAsync.callbackHandler.onStopContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onStopContainerError for "
+ "Container " + event.getContainerId(), thr);
}
return ContainerState.FAILED;
}
}
protected static class OutOfOrderTransition implements
SingleArcTransition<StatefulContainer, ContainerEvent> {
protected static final String STOP_BEFORE_START_ERROR_MSG =
"Container was killed before it was launched";
@Override
public void transition(StatefulContainer container, ContainerEvent event) {
try {
container.nmClientAsync.callbackHandler.onStartContainerError(
event.getContainerId(),
RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info(
"Unchecked exception is thrown from onStartContainerError for " +
"Container " + event.getContainerId(), thr);
}
}
}
private final NMClientAsync nmClientAsync;
private final ContainerId containerId;
private final StateMachine<ContainerState, ContainerEventType,
ContainerEvent> stateMachine;
private final ReadLock readLock;
private final WriteLock writeLock;
public StatefulContainer(NMClientAsync client, ContainerId containerId) {
this.nmClientAsync = client;
this.containerId = containerId;
stateMachine = stateMachineFactory.make(this);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
public void handle(ContainerEvent event) {
writeLock.lock();
try {
try {
this.stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
}
} finally {
writeLock.unlock();
}
}
public ContainerId getContainerId() {
return containerId;
}
public ContainerState getState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
}
protected class ContainerEventProcessor implements Runnable {
protected ContainerEvent event;
public ContainerEventProcessor(ContainerEvent event) {
this.event = event;
}
@Override
public void run() {
ContainerId containerId = event.getContainerId();
LOG.info("Processing Event " + event + " for Container " + containerId);
if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
try {
ContainerStatus containerStatus = client.getContainerStatus(
containerId, event.getNodeId(), event.getContainerToken());
try {
callbackHandler.onContainerStatusReceived(
containerId, containerStatus);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info(
"Unchecked exception is thrown from onContainerStatusReceived" +
" for Container " + event.getContainerId(), thr);
}
} catch (YarnRemoteException e) {
onExceptionRaised(containerId, e);
} catch (IOException e) {
onExceptionRaised(containerId, e);
} catch (Throwable t) {
onExceptionRaised(containerId, t);
}
} else {
StatefulContainer container = containers.get(containerId);
if (container == null) {
LOG.info("Container " + containerId + " is already stopped or failed");
} else {
container.handle(event);
}
}
}
private void onExceptionRaised(ContainerId containerId, Throwable t) {
try {
callbackHandler.onGetContainerStatusError(containerId, t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
" for Container " + containerId, thr);
}
}
}
/**
* <p>
* The callback interface needs to be implemented by {@link NMClientAsync}
* users. The APIs are called when responses from <code>NodeManager</code> are
* available.
* </p>
*
* <p>
* Once a callback happens, the users can chose to act on it in blocking or
* non-blocking manner. If the action on callback is done in a blocking
* manner, some of the threads performing requests on NodeManagers may get
* blocked depending on how many threads in the pool are busy.
* </p>
*
* <p>
* The implementation of the callback function should not throw the
* unexpected exception. Otherwise, {@link NMClientAsync} will just
* catch, log and then ignore it.
* </p>
*/
public static interface CallbackHandler {
/**
* The API is called when <code>NodeManager</code> responds to indicate its
* acceptance of the starting container request
* @param containerId the Id of the container
* @param allServiceResponse a Map between the auxiliary service names and
* their outputs
*/
void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse);
/**
* The API is called when <code>NodeManager</code> responds with the status
* of the container
* @param containerId the Id of the container
* @param containerStatus the status of the container
*/
void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus);
/**
* The API is called when <code>NodeManager</code> responds to indicate the
* container is stopped.
* @param containerId the Id of the container
*/
void onContainerStopped(ContainerId containerId);
/**
* The API is called when an exception is raised in the process of
* starting a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onStartContainerError(ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* querying the status of a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onGetContainerStatusError(ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* stopping a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onStopContainerError(ContainerId containerId, Throwable t);
}
}

View File

@ -0,0 +1,388 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* This class implements {@link NMClient}. All the APIs are blocking.
* </p>
*
* <p>
* By default, this client stops all the running containers that are started by
* it when it stops. It can be disabled via
* {@link #cleanupRunningContainersOnStop}, in which case containers will
* continue to run even after this client is stopped and till the application
* runs at which point ResourceManager will forcefully kill them.
* </p>
*/
public class NMClientImpl extends AbstractService implements NMClient {
private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
// The logically coherent operations on startedContainers is synchronized to
// ensure they are atomic
protected ConcurrentMap<ContainerId, StartedContainer> startedContainers =
new ConcurrentHashMap<ContainerId, StartedContainer>();
//enabled by default
protected AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
public NMClientImpl() {
super(NMClientImpl.class.getName());
}
public NMClientImpl(String name) {
super(name);
}
@Override
public void stop() {
// Usually, started-containers are stopped when this client stops. Unless
// the flag cleanupRunningContainers is set to false.
if (cleanupRunningContainers.get()) {
cleanupRunningContainers();
}
super.stop();
}
protected synchronized void cleanupRunningContainers() {
for (StartedContainer startedContainer : startedContainers.values()) {
try {
stopContainer(startedContainer.getContainerId(),
startedContainer.getNodeId(),
startedContainer.getContainerToken());
} catch (YarnRemoteException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
"when stopping NMClientImpl");
} catch (IOException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
"when stopping NMClientImpl");
}
}
}
@Override
public void cleanupRunningContainersOnStop(boolean enabled) {
cleanupRunningContainers.set(enabled);
}
protected static class StartedContainer {
private ContainerId containerId;
private NodeId nodeId;
private ContainerToken containerToken;
private boolean stopped;
public StartedContainer(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) {
this.containerId = containerId;
this.nodeId = nodeId;
this.containerToken = containerToken;
stopped = false;
}
public ContainerId getContainerId() {
return containerId;
}
public NodeId getNodeId() {
return nodeId;
}
public ContainerToken getContainerToken() {
return containerToken;
}
}
protected static final class NMCommunicator extends AbstractService {
private ContainerId containerId;
private NodeId nodeId;
private ContainerToken containerToken;
private ContainerManager containerManager;
public NMCommunicator(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) {
super(NMCommunicator.class.getName());
this.containerId = containerId;
this.nodeId = nodeId;
this.containerToken = containerToken;
}
@Override
public synchronized void start() {
final YarnRPC rpc = YarnRPC.create(getConfig());
final InetSocketAddress containerAddress =
NetUtils.createSocketAddr(nodeId.toString());
// the user in createRemoteUser in this context has to be ContainerId
UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser(containerId.toString());
Token<ContainerTokenIdentifier> token =
ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
currentUser.addToken(token);
containerManager = currentUser
.doAs(new PrivilegedAction<ContainerManager>() {
@Override
public ContainerManager run() {
return (ContainerManager) rpc.getProxy(ContainerManager.class,
containerAddress, getConfig());
}
});
LOG.debug("Connecting to ContainerManager at " + containerAddress);
}
@Override
public synchronized void stop() {
if (this.containerManager != null) {
RPC.stopProxy(this.containerManager);
if (LOG.isDebugEnabled()) {
InetSocketAddress containerAddress =
NetUtils.createSocketAddr(nodeId.toString());
LOG.debug("Disconnecting from ContainerManager at " +
containerAddress);
}
}
}
public synchronized Map<String, ByteBuffer> startContainer(
Container container, ContainerLaunchContext containerLaunchContext)
throws YarnRemoteException, IOException {
if (!container.getId().equals(containerId)) {
throw new IllegalArgumentException(
"NMCommunicator's containerId mismatches the given Container's");
}
StartContainerResponse startResponse = null;
try {
StartContainerRequest startRequest =
Records.newRecord(StartContainerRequest.class);
startRequest.setContainer(container);
startRequest.setContainerLaunchContext(containerLaunchContext);
startResponse = containerManager.startContainer(startRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("Started Container " + containerId);
}
} catch (YarnRemoteException e) {
LOG.warn("Container " + containerId + " failed to start", e);
throw e;
} catch (IOException e) {
LOG.warn("Container " + containerId + " failed to start", e);
throw e;
}
return startResponse.getAllServiceResponse();
}
public synchronized void stopContainer() throws YarnRemoteException,
IOException {
try {
StopContainerRequest stopRequest =
Records.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(containerId);
containerManager.stopContainer(stopRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("Stopped Container " + containerId);
}
} catch (YarnRemoteException e) {
LOG.warn("Container " + containerId + " failed to stop", e);
throw e;
} catch (IOException e) {
LOG.warn("Container " + containerId + " failed to stop", e);
throw e;
}
}
public synchronized ContainerStatus getContainerStatus()
throws YarnRemoteException, IOException {
GetContainerStatusResponse statusResponse = null;
try {
GetContainerStatusRequest statusRequest =
Records.newRecord(GetContainerStatusRequest.class);
statusRequest.setContainerId(containerId);
statusResponse = containerManager.getContainerStatus(statusRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("Got the status of Container " + containerId);
}
} catch (YarnRemoteException e) {
LOG.warn(
"Unable to get the status of Container " + containerId, e);
throw e;
} catch (IOException e) {
LOG.warn(
"Unable to get the status of Container " + containerId, e);
throw e;
}
return statusResponse.getStatus();
}
}
@Override
public Map<String, ByteBuffer> startContainer(
Container container, ContainerLaunchContext containerLaunchContext)
throws YarnRemoteException, IOException {
// Do synchronization on StartedContainer to prevent race condition
// between startContainer and stopContainer
synchronized (addStartedContainer(container)) {
Map<String, ByteBuffer> allServiceResponse;
NMCommunicator nmCommunicator = null;
try {
nmCommunicator = new NMCommunicator(container.getId(),
container.getNodeId(), container.getContainerToken());
nmCommunicator.init(getConfig());
nmCommunicator.start();
allServiceResponse =
nmCommunicator.startContainer(container, containerLaunchContext);
} catch (YarnRemoteException e) {
// Remove the started container if it failed to start
removeStartedContainer(container.getId());
throw e;
} catch (IOException e) {
removeStartedContainer(container.getId());
throw e;
} catch (Throwable t) {
removeStartedContainer(container.getId());
throw RPCUtil.getRemoteException(t);
} finally {
if (nmCommunicator != null) {
nmCommunicator.stop();
}
}
return allServiceResponse;
}
// Three choices:
// 1. starting and releasing the proxy before and after each interaction
// 2. starting the proxy when starting the container and releasing it when
// stopping the container
// 3. starting the proxy when starting the container and releasing it when
// stopping the client
// Adopt 1 currently
}
@Override
public void stopContainer(ContainerId containerId, NodeId nodeId,
ContainerToken containerToken) throws YarnRemoteException, IOException {
StartedContainer startedContainer = getStartedContainer(containerId);
if (startedContainer == null) {
throw RPCUtil.getRemoteException("Container " + containerId +
" is either not started yet or already stopped");
}
// Only allow one request of stopping the container to move forward
// When entering the block, check whether the precursor has already stopped
// the container
synchronized (startedContainer) {
if (startedContainer.stopped) {
return;
}
NMCommunicator nmCommunicator = null;
try {
nmCommunicator =
new NMCommunicator(containerId, nodeId, containerToken);
nmCommunicator.init(getConfig());
nmCommunicator.start();
nmCommunicator.stopContainer();
} finally {
if (nmCommunicator != null) {
nmCommunicator.stop();
}
startedContainer.stopped = true;
removeStartedContainer(containerId);
}
}
}
@Override
public ContainerStatus getContainerStatus(ContainerId containerId,
NodeId nodeId, ContainerToken containerToken)
throws YarnRemoteException, IOException {
NMCommunicator nmCommunicator = null;
try {
nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
nmCommunicator.init(getConfig());
nmCommunicator.start();
ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
return containerStatus;
} finally {
if (nmCommunicator != null) {
nmCommunicator.stop();
}
}
}
protected synchronized StartedContainer addStartedContainer(
Container container) throws YarnRemoteException, IOException {
if (startedContainers.containsKey(container.getId())) {
throw RPCUtil.getRemoteException("Container " + container.getId() +
" is already started");
}
StartedContainer startedContainer = new StartedContainer(container.getId(),
container.getNodeId(), container.getContainerToken());
startedContainers.put(startedContainer.getContainerId(), startedContainer);
return startedContainer;
}
protected synchronized void removeStartedContainer(ContainerId containerId) {
startedContainers.remove(containerId);
}
protected synchronized StartedContainer getStartedContainer(
ContainerId containerId) {
return startedContainers.get(containerId);
}
}

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestNMClient {
Configuration conf = null;
MiniYARNCluster yarnCluster = null;
YarnClientImpl yarnClient = null;
AMRMClientImpl rmClient = null;
NMClientImpl nmClient = null;
List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null;
int nodeCount = 3;
@Before
public void setup() throws YarnRemoteException, IOException {
// start minicluster
conf = new YarnConfiguration();
yarnCluster =
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
assertNotNull(yarnCluster);
assertEquals(STATE.STARTED, yarnCluster.getServiceState());
// start rm client
yarnClient = new YarnClientImpl();
yarnClient.init(conf);
yarnClient.start();
assertNotNull(yarnClient);
assertEquals(STATE.STARTED, yarnClient.getServiceState());
// get node info
nodeReports = yarnClient.getNodeReports();
// submit new app
GetNewApplicationResponse newApp = yarnClient.getNewApplication();
ApplicationId appId = newApp.getApplicationId();
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
// set the application id
appContext.setApplicationId(appId);
// set the application name
appContext.setApplicationName("Test");
// Set the priority for the application master
Priority pri = Priority.newInstance(0);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue("default");
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer);
// unmanaged AM
appContext.setUnmanagedAM(true);
// Create the request to send to the applications manager
SubmitApplicationRequest appRequest = Records
.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
// Submit the application to the applications manager
yarnClient.submitApplication(appContext);
// wait for app to start
int iterationsLeft = 30;
while (iterationsLeft > 0) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
attemptId = appReport.getCurrentApplicationAttemptId();
break;
}
sleep(1000);
--iterationsLeft;
}
if (iterationsLeft == 0) {
fail("Application hasn't bee started");
}
// start am rm client
rmClient = new AMRMClientImpl(attemptId);
rmClient.init(conf);
rmClient.start();
assertNotNull(rmClient);
assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client
nmClient = new NMClientImpl();
nmClient.init(conf);
nmClient.start();
assertNotNull(nmClient);
assertEquals(STATE.STARTED, nmClient.getServiceState());
}
@After
public void tearDown() {
rmClient.stop();
// leave one unclosed
assertEquals(1, nmClient.startedContainers.size());
// default true
assertTrue(nmClient.cleanupRunningContainers.get());
// don't stop the running containers
nmClient.cleanupRunningContainersOnStop(false);
assertFalse(nmClient.cleanupRunningContainers.get());
nmClient.stop();
assertTrue(nmClient.startedContainers.size() > 0);
// stop the running containers
nmClient.cleanupRunningContainersOnStop(true);
assertTrue(nmClient.cleanupRunningContainers.get());
nmClient.stop();
assertEquals(0, nmClient.startedContainers.size());
yarnClient.stop();
yarnCluster.stop();
}
@Test (timeout = 60000)
public void testNMClient()
throws YarnRemoteException, IOException {
rmClient.registerApplicationMaster("Host", 10000, "");
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
}
private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
throws YarnRemoteException, IOException {
// setup container request
Resource capability = Resource.newInstance(1024, 0);
Priority priority = Priority.newInstance(0);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[] {node};
String[] racks = new String[] {rack};
for (int i = 0; i < num; ++i) {
rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority, 1));
}
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 2;
Set<Container> containers = new TreeSet<Container>();
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft > 0) {
AllocateResponse allocResponse = rmClient.allocate(0.1f);
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
containers.add(container);
}
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
}
--iterationsLeft;
}
return containers;
}
private void testContainerManagement(NMClientImpl nmClient,
Set<Container> containers) throws IOException {
int size = containers.size();
int i = 0;
for (Container container : containers) {
// getContainerStatus shouldn't be called before startContainer,
// otherwise, NodeManager cannot find the container
try {
nmClient.getContainerStatus(container.getId(), container.getNodeId(),
container.getContainerToken());
fail("Exception is expected");
} catch (YarnRemoteException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains("is not handled by this NodeManager"));
}
// stopContainer shouldn't be called before startContainer,
// otherwise, an exception will be thrown
try {
nmClient.stopContainer(container.getId(), container.getNodeId(),
container.getContainerToken());
fail("Exception is expected");
} catch (YarnRemoteException e) {
assertTrue("The thrown exception is not expected",
e.getMessage().contains(
"is either not started yet or already stopped"));
}
Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ContainerLaunchContext clc =
Records.newRecord(ContainerLaunchContext.class);
clc.setTokens(securityTokens);
try {
nmClient.startContainer(container, clc);
} catch (YarnRemoteException e) {
fail("Exception is not expected");
}
// leave one container unclosed
if (++i < size) {
try {
ContainerStatus status = nmClient.getContainerStatus(container.getId(),
container.getNodeId(), container.getContainerToken());
// verify the container is started and in good shape
assertEquals(container.getId(), status.getContainerId());
assertEquals(ContainerState.RUNNING, status.getState());
assertEquals("", status.getDiagnostics());
assertEquals(-1000, status.getExitStatus());
} catch (YarnRemoteException e) {
fail("Exception is not expected");
}
try {
nmClient.stopContainer(container.getId(), container.getNodeId(),
container.getContainerToken());
} catch (YarnRemoteException e) {
fail("Exception is not expected");
}
// getContainerStatus can be called after stopContainer
try {
ContainerStatus status = nmClient.getContainerStatus(
container.getId(), container.getNodeId(),
container.getContainerToken());
assertEquals(container.getId(), status.getContainerId());
assertEquals(ContainerState.RUNNING, status.getState());
assertTrue("" + i, status.getDiagnostics().contains(
"Container killed by the ApplicationMaster."));
assertEquals(-1000, status.getExitStatus());
} catch (YarnRemoteException e) {
fail("Exception is not expected");
}
}
}
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,540 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestNMClientAsync {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private NMClientAsync asyncClient;
private NodeId nodeId;
private ContainerToken containerToken;
@Test (timeout = 30000)
public void testNMClientAsync() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
// Threads to run are more than the max size of the thread pool
int expectedSuccess = 40;
int expectedFailure = 40;
asyncClient = new MockNMClientAsync1(expectedSuccess, expectedFailure);
asyncClient.init(conf);
Assert.assertEquals("The max thread pool size is not correctly set",
10, asyncClient.maxThreadPoolSize);
asyncClient.start();
for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
if (i == expectedSuccess) {
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
.isAllSuccessCallsExecuted()) {
Thread.sleep(10);
}
asyncClient.client = mockNMClient(1);
}
Container container = mockContainer(i);
ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
asyncClient.startContainer(container, clc);
}
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
.isStartAndQueryFailureCallsExecuted()) {
Thread.sleep(10);
}
asyncClient.client = mockNMClient(2);
((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
for (int i = 0; i < expectedFailure; ++i) {
Container container = mockContainer(
expectedSuccess + expectedFailure + i);
ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
asyncClient.startContainer(container, clc);
}
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
.isStopFailureCallsExecuted()) {
Thread.sleep(10);
}
for (String errorMsg :
((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
System.out.println(errorMsg);
}
Assert.assertEquals("Error occurs in CallbackHandler", 0,
((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
System.out.println(errorMsg);
}
Assert.assertEquals("Error occurs in ContainerEventProcessor", 0,
((MockNMClientAsync1) asyncClient).errorMsgs.size());
asyncClient.stop();
Assert.assertFalse(
"The thread of Container Management Event Dispatcher is still alive",
asyncClient.eventDispatcherThread.isAlive());
Assert.assertTrue("The thread pool is not shut down",
asyncClient.threadPool.isShutdown());
}
private class MockNMClientAsync1 extends NMClientAsync {
private Set<String> errorMsgs =
Collections.synchronizedSet(new HashSet<String>());
protected MockNMClientAsync1(int expectedSuccess, int expectedFailure)
throws YarnRemoteException, IOException {
super(MockNMClientAsync1.class.getName(), mockNMClient(0),
new TestCallbackHandler1(expectedSuccess, expectedFailure));
}
private class MockContainerEventProcessor extends ContainerEventProcessor {
public MockContainerEventProcessor(ContainerEvent event) {
super(event);
}
@Override
public void run() {
try {
super.run();
} catch (RuntimeException e) {
// If the unexpected throwable comes from error callback functions, it
// will break ContainerEventProcessor.run(). Therefore, monitor
// the exception here
errorMsgs.add("Unexpected throwable from callback functions should" +
" be ignored by Container " + event.getContainerId());
}
}
}
@Override
protected ContainerEventProcessor getContainerEventProcessor(
ContainerEvent event) {
return new MockContainerEventProcessor(event);
}
}
private class TestCallbackHandler1
implements NMClientAsync.CallbackHandler {
private boolean path = true;
private int expectedSuccess;
private int expectedFailure;
private AtomicInteger actualStartSuccess = new AtomicInteger(0);
private AtomicInteger actualStartFailure = new AtomicInteger(0);
private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
private AtomicInteger actualQueryFailure = new AtomicInteger(0);
private AtomicInteger actualStopSuccess = new AtomicInteger(0);
private AtomicInteger actualStopFailure = new AtomicInteger(0);
private AtomicIntegerArray actualStartSuccessArray;
private AtomicIntegerArray actualStartFailureArray;
private AtomicIntegerArray actualQuerySuccessArray;
private AtomicIntegerArray actualQueryFailureArray;
private AtomicIntegerArray actualStopSuccessArray;
private AtomicIntegerArray actualStopFailureArray;
private Set<String> errorMsgs =
Collections.synchronizedSet(new HashSet<String>());
public TestCallbackHandler1(int expectedSuccess, int expectedFailure) {
this.expectedSuccess = expectedSuccess;
this.expectedFailure = expectedFailure;
actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
}
@Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
if (path) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerStarted");
return;
}
actualStartSuccess.addAndGet(1);
actualStartSuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.getContainerStatus(containerId, nodeId, containerToken);
} else {
// move on to the following failure tests
asyncClient.stopContainer(containerId, nodeId, containerToken);
}
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@Override
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerStatusReceived");
return;
}
actualQuerySuccess.addAndGet(1);
actualQuerySuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.stopContainer(containerId, nodeId, containerToken);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@Override
public void onContainerStopped(ContainerId containerId) {
if (containerId.getId() >= expectedSuccess) {
errorMsgs.add("Container " + containerId +
" should throw the exception onContainerStopped");
return;
}
actualStopSuccess.addAndGet(1);
actualStopSuccessArray.set(containerId.getId(), 1);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
// If the unexpected throwable comes from success callback functions, it
// will be handled by the error callback functions. Therefore, monitor
// the exception here
if (t instanceof RuntimeException) {
errorMsgs.add("Unexpected throwable from callback functions should be" +
" ignored by Container " + containerId);
}
if (containerId.getId() < expectedSuccess) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onStartContainerError");
return;
}
actualStartFailure.addAndGet(1);
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests
asyncClient.getContainerStatus(containerId, nodeId, containerToken);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
if (t instanceof RuntimeException) {
errorMsgs.add("Unexpected throwable from callback functions should be" +
" ignored by Container " + containerId);
}
if (containerId.getId() < expectedSuccess + expectedFailure) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onStopContainerError");
return;
}
actualStopFailure.addAndGet(1);
actualStopFailureArray.set(
containerId.getId() - expectedSuccess - expectedFailure, 1);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
@Override
public void onGetContainerStatusError(ContainerId containerId,
Throwable t) {
if (t instanceof RuntimeException) {
errorMsgs.add("Unexpected throwable from callback functions should be"
+ " ignored by Container " + containerId);
}
if (containerId.getId() < expectedSuccess) {
errorMsgs.add("Container " + containerId +
" shouldn't throw the exception onGetContainerStatusError");
return;
}
actualQueryFailure.addAndGet(1);
actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
}
public boolean isAllSuccessCallsExecuted() {
boolean isAllSuccessCallsExecuted =
actualStartSuccess.get() == expectedSuccess &&
actualQuerySuccess.get() == expectedSuccess &&
actualStopSuccess.get() == expectedSuccess;
if (isAllSuccessCallsExecuted) {
assertAtomicIntegerArray(actualStartSuccessArray);
assertAtomicIntegerArray(actualQuerySuccessArray);
assertAtomicIntegerArray(actualStopSuccessArray);
}
return isAllSuccessCallsExecuted;
}
public boolean isStartAndQueryFailureCallsExecuted() {
boolean isStartAndQueryFailureCallsExecuted =
actualStartFailure.get() == expectedFailure &&
actualQueryFailure.get() == expectedFailure;
if (isStartAndQueryFailureCallsExecuted) {
assertAtomicIntegerArray(actualStartFailureArray);
assertAtomicIntegerArray(actualQueryFailureArray);
}
return isStartAndQueryFailureCallsExecuted;
}
public boolean isStopFailureCallsExecuted() {
boolean isStopFailureCallsExecuted =
actualStopFailure.get() == expectedFailure;
if (isStopFailureCallsExecuted) {
assertAtomicIntegerArray(actualStopFailureArray);
}
return isStopFailureCallsExecuted;
}
private void assertAtomicIntegerArray(AtomicIntegerArray array) {
for (int i = 0; i < array.length(); ++i) {
Assert.assertEquals(1, array.get(i));
}
}
}
private NMClient mockNMClient(int mode)
throws YarnRemoteException, IOException {
NMClient client = mock(NMClient.class);
switch (mode) {
case 0:
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.<String, ByteBuffer>emptyMap());
when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
any(ContainerToken.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doNothing().when(client).stopContainer(any(ContainerId.class),
any(NodeId.class), any(ContainerToken.class));
break;
case 1:
doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
.startContainer(any(Container.class),
any(ContainerLaunchContext.class));
doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
.getContainerStatus(any(ContainerId.class), any(NodeId.class),
any(ContainerToken.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
.stopContainer(any(ContainerId.class), any(NodeId.class),
any(ContainerToken.class));
break;
case 2:
when(client.startContainer(any(Container.class),
any(ContainerLaunchContext.class))).thenReturn(
Collections.<String, ByteBuffer>emptyMap());
when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
any(ContainerToken.class))).thenReturn(
recordFactory.newRecordInstance(ContainerStatus.class));
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
.stopContainer(any(ContainerId.class), any(NodeId.class),
any(ContainerToken.class));
}
return client;
}
@Test (timeout = 10000)
public void testOutOfOrder() throws Exception {
CyclicBarrier barrierA = new CyclicBarrier(2);
CyclicBarrier barrierB = new CyclicBarrier(2);
CyclicBarrier barrierC = new CyclicBarrier(2);
asyncClient = new MockNMClientAsync2(barrierA, barrierB, barrierC);
asyncClient.init(new Configuration());
asyncClient.start();
final Container container = mockContainer(1);
final ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// start container from another thread
Thread t = new Thread() {
@Override
public void run() {
asyncClient.startContainer(container, clc);
}
};
t.start();
barrierA.await();
asyncClient.stopContainer(container.getId(), container.getNodeId(),
container.getContainerToken());
barrierC.await();
Assert.assertFalse("Starting and stopping should be out of order",
((TestCallbackHandler2) asyncClient.callbackHandler)
.exceptionOccurred.get());
}
private class MockNMClientAsync2 extends NMClientAsync {
private CyclicBarrier barrierA;
private CyclicBarrier barrierB;
protected MockNMClientAsync2(CyclicBarrier barrierA, CyclicBarrier barrierB,
CyclicBarrier barrierC) throws YarnRemoteException, IOException {
super(MockNMClientAsync2.class.getName(), mockNMClient(0),
new TestCallbackHandler2(barrierC));
this.barrierA = barrierA;
this.barrierB = barrierB;
}
private class MockContainerEventProcessor extends ContainerEventProcessor {
public MockContainerEventProcessor(ContainerEvent event) {
super(event);
}
@Override
public void run() {
try {
if (event.getType() == ContainerEventType.START_CONTAINER) {
barrierA.await();
barrierB.await();
}
super.run();
if (event.getType() == ContainerEventType.STOP_CONTAINER) {
barrierB.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
@Override
protected ContainerEventProcessor getContainerEventProcessor(
ContainerEvent event) {
return new MockContainerEventProcessor(event);
}
}
private class TestCallbackHandler2
implements NMClientAsync.CallbackHandler {
private CyclicBarrier barrierC;
private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
public TestCallbackHandler2(CyclicBarrier barrierC) {
this.barrierC = barrierC;
}
@Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
}
@Override
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
}
@Override
public void onContainerStopped(ContainerId containerId) {
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
if (!t.getMessage().equals(NMClientAsync.StatefulContainer
.OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
exceptionOccurred.set(true);
return;
}
try {
barrierC.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
@Override
public void onGetContainerStatusError(ContainerId containerId,
Throwable t) {
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
}
}
private Container mockContainer(int i) {
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(attemptId, i);
nodeId = NodeId.newInstance("localhost", 0);
// Create an empty record
containerToken = recordFactory.newRecordInstance(ContainerToken.class);
return BuilderUtils.newContainer(
containerId, nodeId, null, null, null, containerToken, 0);
}
}

View File

@ -661,6 +661,13 @@
<value>30</value> <value>30</value>
</property> </property>
<property>
<description>Max number of threads in NMClientAsync to process container
management events</description>
<name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
<value>500</value>
</property>
<!--Map Reduce configuration--> <!--Map Reduce configuration-->
<property> <property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>