YARN-422. Add a NM Client library to help application-writers. Contributed by Zhijie Shen.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487184 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0edae7a96a
commit
edc6d7d3ab
|
@ -87,6 +87,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
Azure environments. (See breakdown of tasks below for subtasks and
|
||||
contributors)
|
||||
|
||||
YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
|
||||
via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-365. Change NM heartbeat handling to not generate a scheduler event
|
||||
|
|
|
@ -708,6 +708,14 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
|
||||
1000;
|
||||
|
||||
/**
|
||||
* Max number of threads in NMClientAsync to process container management
|
||||
* events
|
||||
*/
|
||||
public static final String NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE =
|
||||
YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
|
||||
public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
|
||||
|
||||
public YarnConfiguration() {
|
||||
super();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface NMClient extends Service {
|
||||
|
||||
/**
|
||||
* <p>Start an allocated container.</p>
|
||||
*
|
||||
* <p>The <code>ApplicationMaster</code> or other applications that use the
|
||||
* client must provide the details of the allocated container, including the
|
||||
* Id, the assigned node's Id and the token via {@link Container}. In
|
||||
* addition, the AM needs to provide the {@link ContainerLaunchContext} as
|
||||
* well.</p>
|
||||
*
|
||||
* @param container the allocated container
|
||||
* @param containerLaunchContext the context information needed by the
|
||||
* <code>NodeManager</code> to launch the
|
||||
* container
|
||||
* @return a map between the auxiliary service names and their outputs
|
||||
* @throws YarnRemoteException
|
||||
* @throws IOException
|
||||
*/
|
||||
Map<String, ByteBuffer> startContainer(Container container,
|
||||
ContainerLaunchContext containerLaunchContext)
|
||||
throws YarnRemoteException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Stop an started container.</p>
|
||||
*
|
||||
* @param containerId the Id of the started container
|
||||
* @param nodeId the Id of the <code>NodeManager</code>
|
||||
* @param containerToken the security token to verify authenticity of the
|
||||
* started container
|
||||
* @throws YarnRemoteException
|
||||
* @throws IOException
|
||||
*/
|
||||
void stopContainer(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) throws YarnRemoteException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Query the status of a container.</p>
|
||||
*
|
||||
* @param containerId the Id of the started container
|
||||
* @param nodeId the Id of the <code>NodeManager</code>
|
||||
* @param containerToken the security token to verify authenticity of the
|
||||
* started container
|
||||
* @return the status of a container
|
||||
* @throws YarnRemoteException
|
||||
* @throws IOException
|
||||
*/
|
||||
ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) throws YarnRemoteException, IOException;
|
||||
|
||||
/**
|
||||
* <p>Set whether the containers that are started by this client, and are
|
||||
* still running should be stopped when the client stops. By default, the
|
||||
* feature should be enabled.</p>
|
||||
*
|
||||
* @param enabled whether the feature is enabled or not
|
||||
*/
|
||||
void cleanupRunningContainersOnStop(boolean enabled);
|
||||
|
||||
}
|
|
@ -0,0 +1,709 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* <code>NMClientAsync</code> handles communication with all the NodeManagers
|
||||
* and provides asynchronous updates on getting responses from them. It
|
||||
* maintains a thread pool to communicate with individual NMs where a number of
|
||||
* worker threads process requests to NMs by using {@link NMClientImpl}. The max
|
||||
* size of the thread pool is configurable through
|
||||
* {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
|
||||
*
|
||||
* It should be used in conjunction with a CallbackHandler. For example
|
||||
*
|
||||
* <pre>
|
||||
* {@code
|
||||
* class MyCallbackHandler implements NMClientAsync.CallbackHandler {
|
||||
* public void onContainerStarted(ContainerId containerId,
|
||||
* Map<String, ByteBuffer> allServiceResponse) {
|
||||
* [post process after the container is started, process the response]
|
||||
* }
|
||||
*
|
||||
* public void onContainerStatusReceived(ContainerId containerId,
|
||||
* ContainerStatus containerStatus) {
|
||||
* [make use of the status of the container]
|
||||
* }
|
||||
*
|
||||
* public void onContainerStopped(ContainerId containerId) {
|
||||
* [post process after the container is stopped]
|
||||
* }
|
||||
*
|
||||
* public void onStartContainerError(
|
||||
* ContainerId containerId, Throwable t) {
|
||||
* [handle the raised exception]
|
||||
* }
|
||||
*
|
||||
* public void onGetContainerStatusError(
|
||||
* ContainerId containerId, Throwable t) {
|
||||
* [handle the raised exception]
|
||||
* }
|
||||
*
|
||||
* public void onStopContainerError(
|
||||
* ContainerId containerId, Throwable t) {
|
||||
* [handle the raised exception]
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* The client's life-cycle should be managed like the following:
|
||||
*
|
||||
* <pre>
|
||||
* {@code
|
||||
* NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
|
||||
* asyncClient.init(conf);
|
||||
* asyncClient.start();
|
||||
* asyncClient.startContainer(container, containerLaunchContext);
|
||||
* [... wait for container being started]
|
||||
* asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
|
||||
* container.getContainerToken());
|
||||
* [... handle the status in the callback instance]
|
||||
* asyncClient.stopContainer(container.getId(), container.getNodeId(),
|
||||
* container.getContainerToken());
|
||||
* [... wait for container being stopped]
|
||||
* asyncClient.stop();
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
@Unstable
|
||||
@Evolving
|
||||
public class NMClientAsync extends AbstractService {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
|
||||
|
||||
protected static final int INITIAL_THREAD_POOL_SIZE = 10;
|
||||
|
||||
protected ThreadPoolExecutor threadPool;
|
||||
protected int maxThreadPoolSize;
|
||||
protected Thread eventDispatcherThread;
|
||||
protected AtomicBoolean stopped = new AtomicBoolean(false);
|
||||
protected BlockingQueue<ContainerEvent> events =
|
||||
new LinkedBlockingQueue<ContainerEvent>();
|
||||
|
||||
protected NMClient client;
|
||||
protected CallbackHandler callbackHandler;
|
||||
|
||||
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
|
||||
new ConcurrentHashMap<ContainerId, StatefulContainer>();
|
||||
|
||||
public NMClientAsync(CallbackHandler callbackHandler) {
|
||||
this (NMClientAsync.class.getName(), callbackHandler);
|
||||
}
|
||||
|
||||
public NMClientAsync(String name, CallbackHandler callbackHandler) {
|
||||
this (name, new NMClientImpl(), callbackHandler);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected NMClientAsync(String name, NMClient client,
|
||||
CallbackHandler callbackHandler) {
|
||||
super(name);
|
||||
this.client = client;
|
||||
this.callbackHandler = callbackHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
this.maxThreadPoolSize = conf.getInt(
|
||||
YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
|
||||
LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
|
||||
|
||||
client.init(conf);
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
client.start();
|
||||
|
||||
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
|
||||
this.getClass().getName() + " #%d").setDaemon(true).build();
|
||||
|
||||
// Start with a default core-pool size and change it dynamically.
|
||||
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
|
||||
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
|
||||
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
|
||||
|
||||
eventDispatcherThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
ContainerEvent event = null;
|
||||
Set<String> allNodes = new HashSet<String>();
|
||||
|
||||
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
event = events.take();
|
||||
} catch (InterruptedException e) {
|
||||
if (!stopped.get()) {
|
||||
LOG.error("Returning, thread interrupted", e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
allNodes.add(event.getNodeId().toString());
|
||||
|
||||
int threadPoolSize = threadPool.getCorePoolSize();
|
||||
|
||||
// We can increase the pool size only if haven't reached the maximum
|
||||
// limit yet.
|
||||
if (threadPoolSize != maxThreadPoolSize) {
|
||||
|
||||
// nodes where containers will run at *this* point of time. This is
|
||||
// *not* the cluster size and doesn't need to be.
|
||||
int nodeNum = allNodes.size();
|
||||
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
|
||||
|
||||
if (threadPoolSize < idealThreadPoolSize) {
|
||||
// Bump up the pool size to idealThreadPoolSize +
|
||||
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
|
||||
// always increasing the pool-size
|
||||
int newThreadPoolSize = Math.min(maxThreadPoolSize,
|
||||
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
|
||||
LOG.info("Set NMClientAsync thread pool size to " +
|
||||
newThreadPoolSize + " as the number of nodes to talk to is "
|
||||
+ nodeNum);
|
||||
threadPool.setCorePoolSize(newThreadPoolSize);
|
||||
}
|
||||
}
|
||||
|
||||
// the events from the queue are handled in parallel with a thread
|
||||
// pool
|
||||
threadPool.execute(getContainerEventProcessor(event));
|
||||
|
||||
// TODO: Group launching of multiple containers to a single
|
||||
// NodeManager into a single connection
|
||||
}
|
||||
}
|
||||
};
|
||||
eventDispatcherThread.setName("Container Event Dispatcher");
|
||||
eventDispatcherThread.setDaemon(false);
|
||||
eventDispatcherThread.start();
|
||||
|
||||
super.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (stopped.getAndSet(true)) {
|
||||
// return if already stopped
|
||||
return;
|
||||
}
|
||||
eventDispatcherThread.interrupt();
|
||||
try {
|
||||
eventDispatcherThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("The thread of " + eventDispatcherThread.getName() +
|
||||
" didn't finish normally.", e);
|
||||
}
|
||||
threadPool.shutdownNow();
|
||||
// If NMClientImpl doesn't stop running containers, the states doesn't
|
||||
// need to be cleared.
|
||||
if (!(client instanceof NMClientImpl) ||
|
||||
((NMClientImpl) client).cleanupRunningContainers.get()) {
|
||||
containers.clear();
|
||||
}
|
||||
client.stop();
|
||||
super.stop();
|
||||
}
|
||||
|
||||
public void startContainer(
|
||||
Container container, ContainerLaunchContext containerLaunchContext) {
|
||||
if (containers.putIfAbsent(container.getId(),
|
||||
new StatefulContainer(this, container.getId())) != null) {
|
||||
callbackHandler.onStartContainerError(container.getId(),
|
||||
RPCUtil.getRemoteException("Container " + container.getId() +
|
||||
" is already started or scheduled to start"));
|
||||
}
|
||||
try {
|
||||
events.put(new StartContainerEvent(container, containerLaunchContext));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event of starting Container " +
|
||||
container.getId());
|
||||
callbackHandler.onStartContainerError(container.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stopContainer(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) {
|
||||
if (containers.get(containerId) == null) {
|
||||
callbackHandler.onStopContainerError(containerId,
|
||||
RPCUtil.getRemoteException("Container " + containerId +
|
||||
" is neither started nor scheduled to start"));
|
||||
}
|
||||
try {
|
||||
events.put(new ContainerEvent(containerId, nodeId, containerToken,
|
||||
ContainerEventType.STOP_CONTAINER));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event of stopping Container " +
|
||||
containerId);
|
||||
callbackHandler.onStopContainerError(containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void getContainerStatus(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) {
|
||||
try {
|
||||
events.put(new ContainerEvent(containerId, nodeId, containerToken,
|
||||
ContainerEventType.QUERY_CONTAINER));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Exception when scheduling the event of querying the status" +
|
||||
" of Container " + containerId);
|
||||
callbackHandler.onGetContainerStatusError(containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected static enum ContainerState {
|
||||
PREP, FAILED, RUNNING, DONE,
|
||||
}
|
||||
|
||||
protected boolean isCompletelyDone(StatefulContainer container) {
|
||||
return container.getState() == ContainerState.DONE ||
|
||||
container.getState() == ContainerState.FAILED;
|
||||
}
|
||||
|
||||
protected ContainerEventProcessor getContainerEventProcessor(
|
||||
ContainerEvent event) {
|
||||
return new ContainerEventProcessor(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* The type of the event of interacting with a container
|
||||
*/
|
||||
protected static enum ContainerEventType {
|
||||
START_CONTAINER,
|
||||
STOP_CONTAINER,
|
||||
QUERY_CONTAINER
|
||||
}
|
||||
|
||||
protected static class ContainerEvent
|
||||
extends AbstractEvent<ContainerEventType>{
|
||||
private ContainerId containerId;
|
||||
private NodeId nodeId;
|
||||
private ContainerToken containerToken;
|
||||
|
||||
public ContainerEvent(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken, ContainerEventType type) {
|
||||
super(type);
|
||||
this.containerId = containerId;
|
||||
this.nodeId = nodeId;
|
||||
this.containerToken = containerToken;
|
||||
}
|
||||
|
||||
public ContainerId getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public ContainerToken getContainerToken() {
|
||||
return containerToken;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class StartContainerEvent extends ContainerEvent {
|
||||
private Container container;
|
||||
private ContainerLaunchContext containerLaunchContext;
|
||||
|
||||
public StartContainerEvent(Container container,
|
||||
ContainerLaunchContext containerLaunchContext) {
|
||||
super(container.getId(), container.getNodeId(),
|
||||
container.getContainerToken(), ContainerEventType.START_CONTAINER);
|
||||
this.container = container;
|
||||
this.containerLaunchContext = containerLaunchContext;
|
||||
}
|
||||
|
||||
public Container getContainer() {
|
||||
return container;
|
||||
}
|
||||
|
||||
public ContainerLaunchContext getContainerLaunchContext() {
|
||||
return containerLaunchContext;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class StatefulContainer implements
|
||||
EventHandler<ContainerEvent> {
|
||||
|
||||
protected final static StateMachineFactory<StatefulContainer,
|
||||
ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory
|
||||
= new StateMachineFactory<StatefulContainer, ContainerState,
|
||||
ContainerEventType, ContainerEvent>(ContainerState.PREP)
|
||||
|
||||
// Transitions from PREP state
|
||||
.addTransition(ContainerState.PREP,
|
||||
EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
|
||||
ContainerEventType.START_CONTAINER,
|
||||
new StartContainerTransition())
|
||||
.addTransition(ContainerState.PREP, ContainerState.DONE,
|
||||
ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
|
||||
|
||||
// Transitions from RUNNING state
|
||||
// RUNNING -> RUNNING should be the invalid transition
|
||||
.addTransition(ContainerState.RUNNING,
|
||||
EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
|
||||
ContainerEventType.STOP_CONTAINER,
|
||||
new StopContainerTransition())
|
||||
|
||||
// Transition from DONE state
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
EnumSet.of(ContainerEventType.START_CONTAINER,
|
||||
ContainerEventType.STOP_CONTAINER))
|
||||
|
||||
// Transition from FAILED state
|
||||
.addTransition(ContainerState.FAILED, ContainerState.FAILED,
|
||||
EnumSet.of(ContainerEventType.START_CONTAINER,
|
||||
ContainerEventType.STOP_CONTAINER));
|
||||
|
||||
protected static class StartContainerTransition implements
|
||||
MultipleArcTransition<StatefulContainer, ContainerEvent,
|
||||
ContainerState> {
|
||||
|
||||
@Override
|
||||
public ContainerState transition(
|
||||
StatefulContainer container, ContainerEvent event) {
|
||||
ContainerId containerId = event.getContainerId();
|
||||
try {
|
||||
StartContainerEvent scEvent = null;
|
||||
if (event instanceof StartContainerEvent) {
|
||||
scEvent = (StartContainerEvent) event;
|
||||
}
|
||||
assert scEvent != null;
|
||||
Map<String, ByteBuffer> allServiceResponse =
|
||||
container.nmClientAsync.client.startContainer(
|
||||
scEvent.getContainer(), scEvent.getContainerLaunchContext());
|
||||
try {
|
||||
container.nmClientAsync.callbackHandler.onContainerStarted(
|
||||
containerId, allServiceResponse);
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info("Unchecked exception is thrown from onContainerStarted for "
|
||||
+ "Container " + containerId, thr);
|
||||
}
|
||||
return ContainerState.RUNNING;
|
||||
} catch (YarnRemoteException e) {
|
||||
return onExceptionRaised(container, event, e);
|
||||
} catch (IOException e) {
|
||||
return onExceptionRaised(container, event, e);
|
||||
} catch (Throwable t) {
|
||||
return onExceptionRaised(container, event, t);
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerState onExceptionRaised(StatefulContainer container,
|
||||
ContainerEvent event, Throwable t) {
|
||||
try {
|
||||
container.nmClientAsync.callbackHandler.onStartContainerError(
|
||||
event.getContainerId(), t);
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info(
|
||||
"Unchecked exception is thrown from onStartContainerError for " +
|
||||
"Container " + event.getContainerId(), thr);
|
||||
}
|
||||
return ContainerState.FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class StopContainerTransition implements
|
||||
MultipleArcTransition<StatefulContainer, ContainerEvent,
|
||||
ContainerState> {
|
||||
|
||||
@Override
|
||||
public ContainerState transition(
|
||||
StatefulContainer container, ContainerEvent event) {
|
||||
ContainerId containerId = event.getContainerId();
|
||||
try {
|
||||
container.nmClientAsync.client.stopContainer(
|
||||
containerId, event.getNodeId(), event.getContainerToken());
|
||||
try {
|
||||
container.nmClientAsync.callbackHandler.onContainerStopped(
|
||||
event.getContainerId());
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info("Unchecked exception is thrown from onContainerStopped for "
|
||||
+ "Container " + event.getContainerId(), thr);
|
||||
}
|
||||
return ContainerState.DONE;
|
||||
} catch (YarnRemoteException e) {
|
||||
return onExceptionRaised(container, event, e);
|
||||
} catch (IOException e) {
|
||||
return onExceptionRaised(container, event, e);
|
||||
} catch (Throwable t) {
|
||||
return onExceptionRaised(container, event, t);
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerState onExceptionRaised(StatefulContainer container,
|
||||
ContainerEvent event, Throwable t) {
|
||||
try {
|
||||
container.nmClientAsync.callbackHandler.onStopContainerError(
|
||||
event.getContainerId(), t);
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info("Unchecked exception is thrown from onStopContainerError for "
|
||||
+ "Container " + event.getContainerId(), thr);
|
||||
}
|
||||
return ContainerState.FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
protected static class OutOfOrderTransition implements
|
||||
SingleArcTransition<StatefulContainer, ContainerEvent> {
|
||||
|
||||
protected static final String STOP_BEFORE_START_ERROR_MSG =
|
||||
"Container was killed before it was launched";
|
||||
|
||||
@Override
|
||||
public void transition(StatefulContainer container, ContainerEvent event) {
|
||||
try {
|
||||
container.nmClientAsync.callbackHandler.onStartContainerError(
|
||||
event.getContainerId(),
|
||||
RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info(
|
||||
"Unchecked exception is thrown from onStartContainerError for " +
|
||||
"Container " + event.getContainerId(), thr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final NMClientAsync nmClientAsync;
|
||||
private final ContainerId containerId;
|
||||
private final StateMachine<ContainerState, ContainerEventType,
|
||||
ContainerEvent> stateMachine;
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
|
||||
public StatefulContainer(NMClientAsync client, ContainerId containerId) {
|
||||
this.nmClientAsync = client;
|
||||
this.containerId = containerId;
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerEvent event) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
try {
|
||||
this.stateMachine.doTransition(event.getType(), event);
|
||||
} catch (InvalidStateTransitonException e) {
|
||||
LOG.error("Can't handle this event at current state", e);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public ContainerId getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
public ContainerState getState() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return stateMachine.getCurrentState();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected class ContainerEventProcessor implements Runnable {
|
||||
protected ContainerEvent event;
|
||||
|
||||
public ContainerEventProcessor(ContainerEvent event) {
|
||||
this.event = event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
ContainerId containerId = event.getContainerId();
|
||||
LOG.info("Processing Event " + event + " for Container " + containerId);
|
||||
if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
|
||||
try {
|
||||
ContainerStatus containerStatus = client.getContainerStatus(
|
||||
containerId, event.getNodeId(), event.getContainerToken());
|
||||
try {
|
||||
callbackHandler.onContainerStatusReceived(
|
||||
containerId, containerStatus);
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info(
|
||||
"Unchecked exception is thrown from onContainerStatusReceived" +
|
||||
" for Container " + event.getContainerId(), thr);
|
||||
}
|
||||
} catch (YarnRemoteException e) {
|
||||
onExceptionRaised(containerId, e);
|
||||
} catch (IOException e) {
|
||||
onExceptionRaised(containerId, e);
|
||||
} catch (Throwable t) {
|
||||
onExceptionRaised(containerId, t);
|
||||
}
|
||||
} else {
|
||||
StatefulContainer container = containers.get(containerId);
|
||||
if (container == null) {
|
||||
LOG.info("Container " + containerId + " is already stopped or failed");
|
||||
} else {
|
||||
container.handle(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void onExceptionRaised(ContainerId containerId, Throwable t) {
|
||||
try {
|
||||
callbackHandler.onGetContainerStatusError(containerId, t);
|
||||
} catch (Throwable thr) {
|
||||
// Don't process user created unchecked exception
|
||||
LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
|
||||
" for Container " + containerId, thr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The callback interface needs to be implemented by {@link NMClientAsync}
|
||||
* users. The APIs are called when responses from <code>NodeManager</code> are
|
||||
* available.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Once a callback happens, the users can chose to act on it in blocking or
|
||||
* non-blocking manner. If the action on callback is done in a blocking
|
||||
* manner, some of the threads performing requests on NodeManagers may get
|
||||
* blocked depending on how many threads in the pool are busy.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The implementation of the callback function should not throw the
|
||||
* unexpected exception. Otherwise, {@link NMClientAsync} will just
|
||||
* catch, log and then ignore it.
|
||||
* </p>
|
||||
*/
|
||||
public static interface CallbackHandler {
|
||||
/**
|
||||
* The API is called when <code>NodeManager</code> responds to indicate its
|
||||
* acceptance of the starting container request
|
||||
* @param containerId the Id of the container
|
||||
* @param allServiceResponse a Map between the auxiliary service names and
|
||||
* their outputs
|
||||
*/
|
||||
void onContainerStarted(ContainerId containerId,
|
||||
Map<String, ByteBuffer> allServiceResponse);
|
||||
|
||||
/**
|
||||
* The API is called when <code>NodeManager</code> responds with the status
|
||||
* of the container
|
||||
* @param containerId the Id of the container
|
||||
* @param containerStatus the status of the container
|
||||
*/
|
||||
void onContainerStatusReceived(ContainerId containerId,
|
||||
ContainerStatus containerStatus);
|
||||
|
||||
/**
|
||||
* The API is called when <code>NodeManager</code> responds to indicate the
|
||||
* container is stopped.
|
||||
* @param containerId the Id of the container
|
||||
*/
|
||||
void onContainerStopped(ContainerId containerId);
|
||||
|
||||
/**
|
||||
* The API is called when an exception is raised in the process of
|
||||
* starting a container
|
||||
*
|
||||
* @param containerId the Id of the container
|
||||
* @param t the raised exception
|
||||
*/
|
||||
void onStartContainerError(ContainerId containerId, Throwable t);
|
||||
|
||||
/**
|
||||
* The API is called when an exception is raised in the process of
|
||||
* querying the status of a container
|
||||
*
|
||||
* @param containerId the Id of the container
|
||||
* @param t the raised exception
|
||||
*/
|
||||
void onGetContainerStatusError(ContainerId containerId, Throwable t);
|
||||
|
||||
/**
|
||||
* The API is called when an exception is raised in the process of
|
||||
* stopping a container
|
||||
*
|
||||
* @param containerId the Id of the container
|
||||
* @param t the raised exception
|
||||
*/
|
||||
void onStopContainerError(ContainerId containerId, Throwable t);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,388 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This class implements {@link NMClient}. All the APIs are blocking.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* By default, this client stops all the running containers that are started by
|
||||
* it when it stops. It can be disabled via
|
||||
* {@link #cleanupRunningContainersOnStop}, in which case containers will
|
||||
* continue to run even after this client is stopped and till the application
|
||||
* runs at which point ResourceManager will forcefully kill them.
|
||||
* </p>
|
||||
*/
|
||||
public class NMClientImpl extends AbstractService implements NMClient {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
|
||||
|
||||
// The logically coherent operations on startedContainers is synchronized to
|
||||
// ensure they are atomic
|
||||
protected ConcurrentMap<ContainerId, StartedContainer> startedContainers =
|
||||
new ConcurrentHashMap<ContainerId, StartedContainer>();
|
||||
|
||||
//enabled by default
|
||||
protected AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
|
||||
|
||||
public NMClientImpl() {
|
||||
super(NMClientImpl.class.getName());
|
||||
}
|
||||
|
||||
public NMClientImpl(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
// Usually, started-containers are stopped when this client stops. Unless
|
||||
// the flag cleanupRunningContainers is set to false.
|
||||
if (cleanupRunningContainers.get()) {
|
||||
cleanupRunningContainers();
|
||||
}
|
||||
super.stop();
|
||||
}
|
||||
|
||||
protected synchronized void cleanupRunningContainers() {
|
||||
for (StartedContainer startedContainer : startedContainers.values()) {
|
||||
try {
|
||||
stopContainer(startedContainer.getContainerId(),
|
||||
startedContainer.getNodeId(),
|
||||
startedContainer.getContainerToken());
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.error("Failed to stop Container " +
|
||||
startedContainer.getContainerId() +
|
||||
"when stopping NMClientImpl");
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to stop Container " +
|
||||
startedContainer.getContainerId() +
|
||||
"when stopping NMClientImpl");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupRunningContainersOnStop(boolean enabled) {
|
||||
cleanupRunningContainers.set(enabled);
|
||||
}
|
||||
|
||||
protected static class StartedContainer {
|
||||
private ContainerId containerId;
|
||||
private NodeId nodeId;
|
||||
private ContainerToken containerToken;
|
||||
private boolean stopped;
|
||||
|
||||
public StartedContainer(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) {
|
||||
this.containerId = containerId;
|
||||
this.nodeId = nodeId;
|
||||
this.containerToken = containerToken;
|
||||
stopped = false;
|
||||
}
|
||||
|
||||
public ContainerId getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public ContainerToken getContainerToken() {
|
||||
return containerToken;
|
||||
}
|
||||
}
|
||||
|
||||
protected static final class NMCommunicator extends AbstractService {
|
||||
private ContainerId containerId;
|
||||
private NodeId nodeId;
|
||||
private ContainerToken containerToken;
|
||||
private ContainerManager containerManager;
|
||||
|
||||
public NMCommunicator(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) {
|
||||
super(NMCommunicator.class.getName());
|
||||
this.containerId = containerId;
|
||||
this.nodeId = nodeId;
|
||||
this.containerToken = containerToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
final YarnRPC rpc = YarnRPC.create(getConfig());
|
||||
|
||||
final InetSocketAddress containerAddress =
|
||||
NetUtils.createSocketAddr(nodeId.toString());
|
||||
|
||||
// the user in createRemoteUser in this context has to be ContainerId
|
||||
UserGroupInformation currentUser =
|
||||
UserGroupInformation.createRemoteUser(containerId.toString());
|
||||
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
|
||||
currentUser.addToken(token);
|
||||
|
||||
containerManager = currentUser
|
||||
.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
@Override
|
||||
public ContainerManager run() {
|
||||
return (ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||
containerAddress, getConfig());
|
||||
}
|
||||
});
|
||||
|
||||
LOG.debug("Connecting to ContainerManager at " + containerAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
if (this.containerManager != null) {
|
||||
RPC.stopProxy(this.containerManager);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
InetSocketAddress containerAddress =
|
||||
NetUtils.createSocketAddr(nodeId.toString());
|
||||
LOG.debug("Disconnecting from ContainerManager at " +
|
||||
containerAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Map<String, ByteBuffer> startContainer(
|
||||
Container container, ContainerLaunchContext containerLaunchContext)
|
||||
throws YarnRemoteException, IOException {
|
||||
if (!container.getId().equals(containerId)) {
|
||||
throw new IllegalArgumentException(
|
||||
"NMCommunicator's containerId mismatches the given Container's");
|
||||
}
|
||||
StartContainerResponse startResponse = null;
|
||||
try {
|
||||
StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainer(container);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startResponse = containerManager.startContainer(startRequest);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started Container " + containerId);
|
||||
}
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.warn("Container " + containerId + " failed to start", e);
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Container " + containerId + " failed to start", e);
|
||||
throw e;
|
||||
}
|
||||
return startResponse.getAllServiceResponse();
|
||||
}
|
||||
|
||||
public synchronized void stopContainer() throws YarnRemoteException,
|
||||
IOException {
|
||||
try {
|
||||
StopContainerRequest stopRequest =
|
||||
Records.newRecord(StopContainerRequest.class);
|
||||
stopRequest.setContainerId(containerId);
|
||||
containerManager.stopContainer(stopRequest);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Stopped Container " + containerId);
|
||||
}
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.warn("Container " + containerId + " failed to stop", e);
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Container " + containerId + " failed to stop", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized ContainerStatus getContainerStatus()
|
||||
throws YarnRemoteException, IOException {
|
||||
GetContainerStatusResponse statusResponse = null;
|
||||
try {
|
||||
GetContainerStatusRequest statusRequest =
|
||||
Records.newRecord(GetContainerStatusRequest.class);
|
||||
statusRequest.setContainerId(containerId);
|
||||
statusResponse = containerManager.getContainerStatus(statusRequest);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got the status of Container " + containerId);
|
||||
}
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.warn(
|
||||
"Unable to get the status of Container " + containerId, e);
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
LOG.warn(
|
||||
"Unable to get the status of Container " + containerId, e);
|
||||
throw e;
|
||||
}
|
||||
return statusResponse.getStatus();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ByteBuffer> startContainer(
|
||||
Container container, ContainerLaunchContext containerLaunchContext)
|
||||
throws YarnRemoteException, IOException {
|
||||
// Do synchronization on StartedContainer to prevent race condition
|
||||
// between startContainer and stopContainer
|
||||
synchronized (addStartedContainer(container)) {
|
||||
Map<String, ByteBuffer> allServiceResponse;
|
||||
NMCommunicator nmCommunicator = null;
|
||||
try {
|
||||
nmCommunicator = new NMCommunicator(container.getId(),
|
||||
container.getNodeId(), container.getContainerToken());
|
||||
nmCommunicator.init(getConfig());
|
||||
nmCommunicator.start();
|
||||
allServiceResponse =
|
||||
nmCommunicator.startContainer(container, containerLaunchContext);
|
||||
} catch (YarnRemoteException e) {
|
||||
// Remove the started container if it failed to start
|
||||
removeStartedContainer(container.getId());
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
removeStartedContainer(container.getId());
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
removeStartedContainer(container.getId());
|
||||
throw RPCUtil.getRemoteException(t);
|
||||
} finally {
|
||||
if (nmCommunicator != null) {
|
||||
nmCommunicator.stop();
|
||||
}
|
||||
}
|
||||
return allServiceResponse;
|
||||
}
|
||||
|
||||
// Three choices:
|
||||
// 1. starting and releasing the proxy before and after each interaction
|
||||
// 2. starting the proxy when starting the container and releasing it when
|
||||
// stopping the container
|
||||
// 3. starting the proxy when starting the container and releasing it when
|
||||
// stopping the client
|
||||
// Adopt 1 currently
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopContainer(ContainerId containerId, NodeId nodeId,
|
||||
ContainerToken containerToken) throws YarnRemoteException, IOException {
|
||||
StartedContainer startedContainer = getStartedContainer(containerId);
|
||||
if (startedContainer == null) {
|
||||
throw RPCUtil.getRemoteException("Container " + containerId +
|
||||
" is either not started yet or already stopped");
|
||||
}
|
||||
// Only allow one request of stopping the container to move forward
|
||||
// When entering the block, check whether the precursor has already stopped
|
||||
// the container
|
||||
synchronized (startedContainer) {
|
||||
if (startedContainer.stopped) {
|
||||
return;
|
||||
}
|
||||
NMCommunicator nmCommunicator = null;
|
||||
try {
|
||||
nmCommunicator =
|
||||
new NMCommunicator(containerId, nodeId, containerToken);
|
||||
nmCommunicator.init(getConfig());
|
||||
nmCommunicator.start();
|
||||
nmCommunicator.stopContainer();
|
||||
} finally {
|
||||
if (nmCommunicator != null) {
|
||||
nmCommunicator.stop();
|
||||
}
|
||||
startedContainer.stopped = true;
|
||||
removeStartedContainer(containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerStatus getContainerStatus(ContainerId containerId,
|
||||
NodeId nodeId, ContainerToken containerToken)
|
||||
throws YarnRemoteException, IOException {
|
||||
NMCommunicator nmCommunicator = null;
|
||||
try {
|
||||
nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
|
||||
nmCommunicator.init(getConfig());
|
||||
nmCommunicator.start();
|
||||
ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
|
||||
return containerStatus;
|
||||
} finally {
|
||||
if (nmCommunicator != null) {
|
||||
nmCommunicator.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized StartedContainer addStartedContainer(
|
||||
Container container) throws YarnRemoteException, IOException {
|
||||
if (startedContainers.containsKey(container.getId())) {
|
||||
throw RPCUtil.getRemoteException("Container " + container.getId() +
|
||||
" is already started");
|
||||
}
|
||||
StartedContainer startedContainer = new StartedContainer(container.getId(),
|
||||
container.getNodeId(), container.getContainerToken());
|
||||
startedContainers.put(startedContainer.getContainerId(), startedContainer);
|
||||
return startedContainer;
|
||||
}
|
||||
|
||||
protected synchronized void removeStartedContainer(ContainerId containerId) {
|
||||
startedContainers.remove(containerId);
|
||||
}
|
||||
|
||||
protected synchronized StartedContainer getStartedContainer(
|
||||
ContainerId containerId) {
|
||||
return startedContainers.get(containerId);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,316 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestNMClient {
|
||||
Configuration conf = null;
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
YarnClientImpl yarnClient = null;
|
||||
AMRMClientImpl rmClient = null;
|
||||
NMClientImpl nmClient = null;
|
||||
List<NodeReport> nodeReports = null;
|
||||
ApplicationAttemptId attemptId = null;
|
||||
int nodeCount = 3;
|
||||
|
||||
@Before
|
||||
public void setup() throws YarnRemoteException, IOException {
|
||||
// start minicluster
|
||||
conf = new YarnConfiguration();
|
||||
yarnCluster =
|
||||
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
assertNotNull(yarnCluster);
|
||||
assertEquals(STATE.STARTED, yarnCluster.getServiceState());
|
||||
|
||||
// start rm client
|
||||
yarnClient = new YarnClientImpl();
|
||||
yarnClient.init(conf);
|
||||
yarnClient.start();
|
||||
assertNotNull(yarnClient);
|
||||
assertEquals(STATE.STARTED, yarnClient.getServiceState());
|
||||
|
||||
// get node info
|
||||
nodeReports = yarnClient.getNodeReports();
|
||||
|
||||
// submit new app
|
||||
GetNewApplicationResponse newApp = yarnClient.getNewApplication();
|
||||
ApplicationId appId = newApp.getApplicationId();
|
||||
|
||||
ApplicationSubmissionContext appContext = Records
|
||||
.newRecord(ApplicationSubmissionContext.class);
|
||||
// set the application id
|
||||
appContext.setApplicationId(appId);
|
||||
// set the application name
|
||||
appContext.setApplicationName("Test");
|
||||
// Set the priority for the application master
|
||||
Priority pri = Priority.newInstance(0);
|
||||
appContext.setPriority(pri);
|
||||
// Set the queue to which this application is to be submitted in the RM
|
||||
appContext.setQueue("default");
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
appContext.setAMContainerSpec(amContainer);
|
||||
// unmanaged AM
|
||||
appContext.setUnmanagedAM(true);
|
||||
// Create the request to send to the applications manager
|
||||
SubmitApplicationRequest appRequest = Records
|
||||
.newRecord(SubmitApplicationRequest.class);
|
||||
appRequest.setApplicationSubmissionContext(appContext);
|
||||
// Submit the application to the applications manager
|
||||
yarnClient.submitApplication(appContext);
|
||||
|
||||
// wait for app to start
|
||||
int iterationsLeft = 30;
|
||||
while (iterationsLeft > 0) {
|
||||
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||
if (appReport.getYarnApplicationState() ==
|
||||
YarnApplicationState.ACCEPTED) {
|
||||
attemptId = appReport.getCurrentApplicationAttemptId();
|
||||
break;
|
||||
}
|
||||
sleep(1000);
|
||||
--iterationsLeft;
|
||||
}
|
||||
if (iterationsLeft == 0) {
|
||||
fail("Application hasn't bee started");
|
||||
}
|
||||
|
||||
// start am rm client
|
||||
rmClient = new AMRMClientImpl(attemptId);
|
||||
rmClient.init(conf);
|
||||
rmClient.start();
|
||||
assertNotNull(rmClient);
|
||||
assertEquals(STATE.STARTED, rmClient.getServiceState());
|
||||
|
||||
// start am nm client
|
||||
nmClient = new NMClientImpl();
|
||||
nmClient.init(conf);
|
||||
nmClient.start();
|
||||
assertNotNull(nmClient);
|
||||
assertEquals(STATE.STARTED, nmClient.getServiceState());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
rmClient.stop();
|
||||
|
||||
// leave one unclosed
|
||||
assertEquals(1, nmClient.startedContainers.size());
|
||||
// default true
|
||||
assertTrue(nmClient.cleanupRunningContainers.get());
|
||||
// don't stop the running containers
|
||||
nmClient.cleanupRunningContainersOnStop(false);
|
||||
assertFalse(nmClient.cleanupRunningContainers.get());
|
||||
nmClient.stop();
|
||||
assertTrue(nmClient.startedContainers.size() > 0);
|
||||
// stop the running containers
|
||||
nmClient.cleanupRunningContainersOnStop(true);
|
||||
assertTrue(nmClient.cleanupRunningContainers.get());
|
||||
nmClient.stop();
|
||||
assertEquals(0, nmClient.startedContainers.size());
|
||||
|
||||
yarnClient.stop();
|
||||
yarnCluster.stop();
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNMClient()
|
||||
throws YarnRemoteException, IOException {
|
||||
|
||||
rmClient.registerApplicationMaster("Host", 10000, "");
|
||||
|
||||
testContainerManagement(nmClient, allocateContainers(rmClient, 5));
|
||||
|
||||
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
||||
null, null);
|
||||
}
|
||||
|
||||
private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
|
||||
throws YarnRemoteException, IOException {
|
||||
// setup container request
|
||||
Resource capability = Resource.newInstance(1024, 0);
|
||||
Priority priority = Priority.newInstance(0);
|
||||
String node = nodeReports.get(0).getNodeId().getHost();
|
||||
String rack = nodeReports.get(0).getRackName();
|
||||
String[] nodes = new String[] {node};
|
||||
String[] racks = new String[] {rack};
|
||||
|
||||
for (int i = 0; i < num; ++i) {
|
||||
rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
||||
racks, priority, 1));
|
||||
}
|
||||
|
||||
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
|
||||
.get(ResourceRequest.ANY).get(capability).getNumContainers();
|
||||
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
int iterationsLeft = 2;
|
||||
Set<Container> containers = new TreeSet<Container>();
|
||||
while (allocatedContainerCount < containersRequestedAny
|
||||
&& iterationsLeft > 0) {
|
||||
AllocateResponse allocResponse = rmClient.allocate(0.1f);
|
||||
|
||||
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
||||
for(Container container : allocResponse.getAllocatedContainers()) {
|
||||
containers.add(container);
|
||||
}
|
||||
if(allocatedContainerCount < containersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(1000);
|
||||
}
|
||||
|
||||
--iterationsLeft;
|
||||
}
|
||||
return containers;
|
||||
}
|
||||
|
||||
private void testContainerManagement(NMClientImpl nmClient,
|
||||
Set<Container> containers) throws IOException {
|
||||
int size = containers.size();
|
||||
int i = 0;
|
||||
for (Container container : containers) {
|
||||
// getContainerStatus shouldn't be called before startContainer,
|
||||
// otherwise, NodeManager cannot find the container
|
||||
try {
|
||||
nmClient.getContainerStatus(container.getId(), container.getNodeId(),
|
||||
container.getContainerToken());
|
||||
fail("Exception is expected");
|
||||
} catch (YarnRemoteException e) {
|
||||
assertTrue("The thrown exception is not expected",
|
||||
e.getMessage().contains("is not handled by this NodeManager"));
|
||||
}
|
||||
|
||||
// stopContainer shouldn't be called before startContainer,
|
||||
// otherwise, an exception will be thrown
|
||||
try {
|
||||
nmClient.stopContainer(container.getId(), container.getNodeId(),
|
||||
container.getContainerToken());
|
||||
fail("Exception is expected");
|
||||
} catch (YarnRemoteException e) {
|
||||
assertTrue("The thrown exception is not expected",
|
||||
e.getMessage().contains(
|
||||
"is either not started yet or already stopped"));
|
||||
}
|
||||
|
||||
Credentials ts = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
ContainerLaunchContext clc =
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
clc.setTokens(securityTokens);
|
||||
try {
|
||||
nmClient.startContainer(container, clc);
|
||||
} catch (YarnRemoteException e) {
|
||||
fail("Exception is not expected");
|
||||
}
|
||||
|
||||
// leave one container unclosed
|
||||
if (++i < size) {
|
||||
try {
|
||||
ContainerStatus status = nmClient.getContainerStatus(container.getId(),
|
||||
container.getNodeId(), container.getContainerToken());
|
||||
// verify the container is started and in good shape
|
||||
assertEquals(container.getId(), status.getContainerId());
|
||||
assertEquals(ContainerState.RUNNING, status.getState());
|
||||
assertEquals("", status.getDiagnostics());
|
||||
assertEquals(-1000, status.getExitStatus());
|
||||
} catch (YarnRemoteException e) {
|
||||
fail("Exception is not expected");
|
||||
}
|
||||
|
||||
try {
|
||||
nmClient.stopContainer(container.getId(), container.getNodeId(),
|
||||
container.getContainerToken());
|
||||
} catch (YarnRemoteException e) {
|
||||
fail("Exception is not expected");
|
||||
}
|
||||
|
||||
// getContainerStatus can be called after stopContainer
|
||||
try {
|
||||
ContainerStatus status = nmClient.getContainerStatus(
|
||||
container.getId(), container.getNodeId(),
|
||||
container.getContainerToken());
|
||||
assertEquals(container.getId(), status.getContainerId());
|
||||
assertEquals(ContainerState.RUNNING, status.getState());
|
||||
assertTrue("" + i, status.getDiagnostics().contains(
|
||||
"Container killed by the ApplicationMaster."));
|
||||
assertEquals(-1000, status.getExitStatus());
|
||||
} catch (YarnRemoteException e) {
|
||||
fail("Exception is not expected");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sleep(int sleepTime) {
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,540 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class TestNMClientAsync {
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
private NMClientAsync asyncClient;
|
||||
private NodeId nodeId;
|
||||
private ContainerToken containerToken;
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testNMClientAsync() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
|
||||
|
||||
// Threads to run are more than the max size of the thread pool
|
||||
int expectedSuccess = 40;
|
||||
int expectedFailure = 40;
|
||||
|
||||
asyncClient = new MockNMClientAsync1(expectedSuccess, expectedFailure);
|
||||
asyncClient.init(conf);
|
||||
Assert.assertEquals("The max thread pool size is not correctly set",
|
||||
10, asyncClient.maxThreadPoolSize);
|
||||
asyncClient.start();
|
||||
|
||||
|
||||
for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
|
||||
if (i == expectedSuccess) {
|
||||
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
|
||||
.isAllSuccessCallsExecuted()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
asyncClient.client = mockNMClient(1);
|
||||
}
|
||||
Container container = mockContainer(i);
|
||||
ContainerLaunchContext clc =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
asyncClient.startContainer(container, clc);
|
||||
}
|
||||
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
|
||||
.isStartAndQueryFailureCallsExecuted()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
asyncClient.client = mockNMClient(2);
|
||||
((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
|
||||
for (int i = 0; i < expectedFailure; ++i) {
|
||||
Container container = mockContainer(
|
||||
expectedSuccess + expectedFailure + i);
|
||||
ContainerLaunchContext clc =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
asyncClient.startContainer(container, clc);
|
||||
}
|
||||
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
|
||||
.isStopFailureCallsExecuted()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
for (String errorMsg :
|
||||
((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
|
||||
System.out.println(errorMsg);
|
||||
}
|
||||
Assert.assertEquals("Error occurs in CallbackHandler", 0,
|
||||
((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
|
||||
for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
|
||||
System.out.println(errorMsg);
|
||||
}
|
||||
Assert.assertEquals("Error occurs in ContainerEventProcessor", 0,
|
||||
((MockNMClientAsync1) asyncClient).errorMsgs.size());
|
||||
asyncClient.stop();
|
||||
Assert.assertFalse(
|
||||
"The thread of Container Management Event Dispatcher is still alive",
|
||||
asyncClient.eventDispatcherThread.isAlive());
|
||||
Assert.assertTrue("The thread pool is not shut down",
|
||||
asyncClient.threadPool.isShutdown());
|
||||
}
|
||||
|
||||
private class MockNMClientAsync1 extends NMClientAsync {
|
||||
private Set<String> errorMsgs =
|
||||
Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
protected MockNMClientAsync1(int expectedSuccess, int expectedFailure)
|
||||
throws YarnRemoteException, IOException {
|
||||
super(MockNMClientAsync1.class.getName(), mockNMClient(0),
|
||||
new TestCallbackHandler1(expectedSuccess, expectedFailure));
|
||||
}
|
||||
|
||||
private class MockContainerEventProcessor extends ContainerEventProcessor {
|
||||
public MockContainerEventProcessor(ContainerEvent event) {
|
||||
super(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
super.run();
|
||||
} catch (RuntimeException e) {
|
||||
// If the unexpected throwable comes from error callback functions, it
|
||||
// will break ContainerEventProcessor.run(). Therefore, monitor
|
||||
// the exception here
|
||||
errorMsgs.add("Unexpected throwable from callback functions should" +
|
||||
" be ignored by Container " + event.getContainerId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerEventProcessor getContainerEventProcessor(
|
||||
ContainerEvent event) {
|
||||
return new MockContainerEventProcessor(event);
|
||||
}
|
||||
}
|
||||
|
||||
private class TestCallbackHandler1
|
||||
implements NMClientAsync.CallbackHandler {
|
||||
|
||||
private boolean path = true;
|
||||
|
||||
private int expectedSuccess;
|
||||
private int expectedFailure;
|
||||
|
||||
private AtomicInteger actualStartSuccess = new AtomicInteger(0);
|
||||
private AtomicInteger actualStartFailure = new AtomicInteger(0);
|
||||
private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
|
||||
private AtomicInteger actualQueryFailure = new AtomicInteger(0);
|
||||
private AtomicInteger actualStopSuccess = new AtomicInteger(0);
|
||||
private AtomicInteger actualStopFailure = new AtomicInteger(0);
|
||||
|
||||
private AtomicIntegerArray actualStartSuccessArray;
|
||||
private AtomicIntegerArray actualStartFailureArray;
|
||||
private AtomicIntegerArray actualQuerySuccessArray;
|
||||
private AtomicIntegerArray actualQueryFailureArray;
|
||||
private AtomicIntegerArray actualStopSuccessArray;
|
||||
private AtomicIntegerArray actualStopFailureArray;
|
||||
|
||||
private Set<String> errorMsgs =
|
||||
Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
public TestCallbackHandler1(int expectedSuccess, int expectedFailure) {
|
||||
this.expectedSuccess = expectedSuccess;
|
||||
this.expectedFailure = expectedFailure;
|
||||
|
||||
actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
|
||||
actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
|
||||
actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
|
||||
actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
|
||||
actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
|
||||
actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStarted(ContainerId containerId,
|
||||
Map<String, ByteBuffer> allServiceResponse) {
|
||||
if (path) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerStarted");
|
||||
return;
|
||||
}
|
||||
actualStartSuccess.addAndGet(1);
|
||||
actualStartSuccessArray.set(containerId.getId(), 1);
|
||||
|
||||
// move on to the following success tests
|
||||
asyncClient.getContainerStatus(containerId, nodeId, containerToken);
|
||||
} else {
|
||||
// move on to the following failure tests
|
||||
asyncClient.stopContainer(containerId, nodeId, containerToken);
|
||||
}
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStatusReceived(ContainerId containerId,
|
||||
ContainerStatus containerStatus) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerStatusReceived");
|
||||
return;
|
||||
}
|
||||
actualQuerySuccess.addAndGet(1);
|
||||
actualQuerySuccessArray.set(containerId.getId(), 1);
|
||||
// move on to the following success tests
|
||||
asyncClient.stopContainer(containerId, nodeId, containerToken);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStopped(ContainerId containerId) {
|
||||
if (containerId.getId() >= expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" should throw the exception onContainerStopped");
|
||||
return;
|
||||
}
|
||||
actualStopSuccess.addAndGet(1);
|
||||
actualStopSuccessArray.set(containerId.getId(), 1);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStartContainerError(ContainerId containerId, Throwable t) {
|
||||
// If the unexpected throwable comes from success callback functions, it
|
||||
// will be handled by the error callback functions. Therefore, monitor
|
||||
// the exception here
|
||||
if (t instanceof RuntimeException) {
|
||||
errorMsgs.add("Unexpected throwable from callback functions should be" +
|
||||
" ignored by Container " + containerId);
|
||||
}
|
||||
if (containerId.getId() < expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception onStartContainerError");
|
||||
return;
|
||||
}
|
||||
actualStartFailure.addAndGet(1);
|
||||
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
|
||||
// move on to the following failure tests
|
||||
asyncClient.getContainerStatus(containerId, nodeId, containerToken);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStopContainerError(ContainerId containerId, Throwable t) {
|
||||
if (t instanceof RuntimeException) {
|
||||
errorMsgs.add("Unexpected throwable from callback functions should be" +
|
||||
" ignored by Container " + containerId);
|
||||
}
|
||||
if (containerId.getId() < expectedSuccess + expectedFailure) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception onStopContainerError");
|
||||
return;
|
||||
}
|
||||
|
||||
actualStopFailure.addAndGet(1);
|
||||
actualStopFailureArray.set(
|
||||
containerId.getId() - expectedSuccess - expectedFailure, 1);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGetContainerStatusError(ContainerId containerId,
|
||||
Throwable t) {
|
||||
if (t instanceof RuntimeException) {
|
||||
errorMsgs.add("Unexpected throwable from callback functions should be"
|
||||
+ " ignored by Container " + containerId);
|
||||
}
|
||||
if (containerId.getId() < expectedSuccess) {
|
||||
errorMsgs.add("Container " + containerId +
|
||||
" shouldn't throw the exception onGetContainerStatusError");
|
||||
return;
|
||||
}
|
||||
actualQueryFailure.addAndGet(1);
|
||||
actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
|
||||
|
||||
// Shouldn't crash the test thread
|
||||
throw new RuntimeException("Ignorable Exception");
|
||||
}
|
||||
|
||||
public boolean isAllSuccessCallsExecuted() {
|
||||
boolean isAllSuccessCallsExecuted =
|
||||
actualStartSuccess.get() == expectedSuccess &&
|
||||
actualQuerySuccess.get() == expectedSuccess &&
|
||||
actualStopSuccess.get() == expectedSuccess;
|
||||
if (isAllSuccessCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualStartSuccessArray);
|
||||
assertAtomicIntegerArray(actualQuerySuccessArray);
|
||||
assertAtomicIntegerArray(actualStopSuccessArray);
|
||||
}
|
||||
return isAllSuccessCallsExecuted;
|
||||
}
|
||||
|
||||
public boolean isStartAndQueryFailureCallsExecuted() {
|
||||
boolean isStartAndQueryFailureCallsExecuted =
|
||||
actualStartFailure.get() == expectedFailure &&
|
||||
actualQueryFailure.get() == expectedFailure;
|
||||
if (isStartAndQueryFailureCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualStartFailureArray);
|
||||
assertAtomicIntegerArray(actualQueryFailureArray);
|
||||
}
|
||||
return isStartAndQueryFailureCallsExecuted;
|
||||
}
|
||||
|
||||
public boolean isStopFailureCallsExecuted() {
|
||||
boolean isStopFailureCallsExecuted =
|
||||
actualStopFailure.get() == expectedFailure;
|
||||
if (isStopFailureCallsExecuted) {
|
||||
assertAtomicIntegerArray(actualStopFailureArray);
|
||||
}
|
||||
return isStopFailureCallsExecuted;
|
||||
}
|
||||
|
||||
private void assertAtomicIntegerArray(AtomicIntegerArray array) {
|
||||
for (int i = 0; i < array.length(); ++i) {
|
||||
Assert.assertEquals(1, array.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NMClient mockNMClient(int mode)
|
||||
throws YarnRemoteException, IOException {
|
||||
NMClient client = mock(NMClient.class);
|
||||
switch (mode) {
|
||||
case 0:
|
||||
when(client.startContainer(any(Container.class),
|
||||
any(ContainerLaunchContext.class))).thenReturn(
|
||||
Collections.<String, ByteBuffer>emptyMap());
|
||||
when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
|
||||
any(ContainerToken.class))).thenReturn(
|
||||
recordFactory.newRecordInstance(ContainerStatus.class));
|
||||
doNothing().when(client).stopContainer(any(ContainerId.class),
|
||||
any(NodeId.class), any(ContainerToken.class));
|
||||
break;
|
||||
case 1:
|
||||
doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
|
||||
.startContainer(any(Container.class),
|
||||
any(ContainerLaunchContext.class));
|
||||
doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
|
||||
.getContainerStatus(any(ContainerId.class), any(NodeId.class),
|
||||
any(ContainerToken.class));
|
||||
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
|
||||
.stopContainer(any(ContainerId.class), any(NodeId.class),
|
||||
any(ContainerToken.class));
|
||||
break;
|
||||
case 2:
|
||||
when(client.startContainer(any(Container.class),
|
||||
any(ContainerLaunchContext.class))).thenReturn(
|
||||
Collections.<String, ByteBuffer>emptyMap());
|
||||
when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
|
||||
any(ContainerToken.class))).thenReturn(
|
||||
recordFactory.newRecordInstance(ContainerStatus.class));
|
||||
doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
|
||||
.stopContainer(any(ContainerId.class), any(NodeId.class),
|
||||
any(ContainerToken.class));
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
public void testOutOfOrder() throws Exception {
|
||||
CyclicBarrier barrierA = new CyclicBarrier(2);
|
||||
CyclicBarrier barrierB = new CyclicBarrier(2);
|
||||
CyclicBarrier barrierC = new CyclicBarrier(2);
|
||||
asyncClient = new MockNMClientAsync2(barrierA, barrierB, barrierC);
|
||||
asyncClient.init(new Configuration());
|
||||
asyncClient.start();
|
||||
|
||||
final Container container = mockContainer(1);
|
||||
final ContainerLaunchContext clc =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
// start container from another thread
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
asyncClient.startContainer(container, clc);
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
|
||||
barrierA.await();
|
||||
asyncClient.stopContainer(container.getId(), container.getNodeId(),
|
||||
container.getContainerToken());
|
||||
barrierC.await();
|
||||
|
||||
Assert.assertFalse("Starting and stopping should be out of order",
|
||||
((TestCallbackHandler2) asyncClient.callbackHandler)
|
||||
.exceptionOccurred.get());
|
||||
}
|
||||
|
||||
private class MockNMClientAsync2 extends NMClientAsync {
|
||||
private CyclicBarrier barrierA;
|
||||
private CyclicBarrier barrierB;
|
||||
|
||||
protected MockNMClientAsync2(CyclicBarrier barrierA, CyclicBarrier barrierB,
|
||||
CyclicBarrier barrierC) throws YarnRemoteException, IOException {
|
||||
super(MockNMClientAsync2.class.getName(), mockNMClient(0),
|
||||
new TestCallbackHandler2(barrierC));
|
||||
this.barrierA = barrierA;
|
||||
this.barrierB = barrierB;
|
||||
}
|
||||
|
||||
private class MockContainerEventProcessor extends ContainerEventProcessor {
|
||||
|
||||
public MockContainerEventProcessor(ContainerEvent event) {
|
||||
super(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (event.getType() == ContainerEventType.START_CONTAINER) {
|
||||
barrierA.await();
|
||||
barrierB.await();
|
||||
}
|
||||
super.run();
|
||||
if (event.getType() == ContainerEventType.STOP_CONTAINER) {
|
||||
barrierB.await();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerEventProcessor getContainerEventProcessor(
|
||||
ContainerEvent event) {
|
||||
return new MockContainerEventProcessor(event);
|
||||
}
|
||||
}
|
||||
|
||||
private class TestCallbackHandler2
|
||||
implements NMClientAsync.CallbackHandler {
|
||||
private CyclicBarrier barrierC;
|
||||
private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
|
||||
|
||||
public TestCallbackHandler2(CyclicBarrier barrierC) {
|
||||
this.barrierC = barrierC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStarted(ContainerId containerId,
|
||||
Map<String, ByteBuffer> allServiceResponse) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStatusReceived(ContainerId containerId,
|
||||
ContainerStatus containerStatus) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStopped(ContainerId containerId) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStartContainerError(ContainerId containerId, Throwable t) {
|
||||
if (!t.getMessage().equals(NMClientAsync.StatefulContainer
|
||||
.OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
|
||||
exceptionOccurred.set(true);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
barrierC.await();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGetContainerStatusError(ContainerId containerId,
|
||||
Throwable t) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStopContainerError(ContainerId containerId, Throwable t) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Container mockContainer(int i) {
|
||||
ApplicationId appId =
|
||||
BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
|
||||
ApplicationAttemptId attemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId = ContainerId.newInstance(attemptId, i);
|
||||
nodeId = NodeId.newInstance("localhost", 0);
|
||||
// Create an empty record
|
||||
containerToken = recordFactory.newRecordInstance(ContainerToken.class);
|
||||
return BuilderUtils.newContainer(
|
||||
containerId, nodeId, null, null, null, containerToken, 0);
|
||||
}
|
||||
|
||||
}
|
|
@ -661,6 +661,13 @@
|
|||
<value>30</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Max number of threads in NMClientAsync to process container
|
||||
management events</description>
|
||||
<name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
|
||||
<value>500</value>
|
||||
</property>
|
||||
|
||||
<!--Map Reduce configuration-->
|
||||
<property>
|
||||
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
|
||||
|
|
Loading…
Reference in New Issue