YARN-834. Fixed annotations for yarn-client module, reorganized packages and clearly differentiated *Async apis. Contributed by Arun C Murthy and Zhijie Shen.

svn merge --ignore-ancestry -c 1494017 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1494018 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-06-18 04:03:43 +00:00
parent 5e257b8925
commit 8e9a719712
34 changed files with 945 additions and 382 deletions

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@ -40,34 +43,85 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ProtoUtils;
public class ResourceMgrDelegate extends YarnClientImpl {
import com.google.common.annotations.VisibleForTesting;
public class ResourceMgrDelegate extends YarnClient {
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
private YarnConfiguration conf;
private GetNewApplicationResponse application;
private ApplicationId applicationId;
@Private
@VisibleForTesting
protected YarnClient client;
private InetSocketAddress rmAddress;
/**
* Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
* Delegate responsible for communicating with the Resource Manager's
* {@link ApplicationClientProtocol}.
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) {
super();
this(conf, null);
}
/**
* Delegate responsible for communicating with the Resource Manager's
* {@link ApplicationClientProtocol}.
* @param conf the configuration object.
* @param rmAddress the address of the Resource Manager
*/
public ResourceMgrDelegate(YarnConfiguration conf,
InetSocketAddress rmAddress) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
this.rmAddress = rmAddress;
if (rmAddress == null) {
client = YarnClient.createYarnClient();
} else {
client = YarnClient.createYarnClient(rmAddress);
}
init(conf);
start();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
if (rmAddress == null) {
this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
client.init(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
client.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
client.stop();
super.serviceStop();
}
public TaskTrackerInfo[] getActiveTrackers() throws IOException,
InterruptedException {
try {
return TypeConverter.fromYarnNodes(super.getNodeReports());
return TypeConverter.fromYarnNodes(client.getNodeReports());
} catch (YarnException e) {
throw new IOException(e);
}
@ -75,7 +129,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
try {
return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
return TypeConverter.fromYarnApps(client.getApplicationList(), this.conf);
} catch (YarnException e) {
throw new IOException(e);
}
@ -91,7 +145,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
try {
YarnClusterMetrics metrics = super.getYarnClusterMetrics();
YarnClusterMetrics metrics = client.getYarnClusterMetrics();
ClusterMetrics oldMetrics =
new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10,
@ -112,7 +166,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
InterruptedException {
try {
return ProtoUtils.convertFromProtoFormat(
super.getRMDelegationToken(renewer), rmAddress);
client.getRMDelegationToken(renewer), rmAddress);
} catch (YarnException e) {
throw new IOException(e);
}
@ -124,7 +178,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public JobID getNewJobID() throws IOException, InterruptedException {
try {
this.application = super.getNewApplication();
this.application = client.getNewApplication();
this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
} catch (YarnException e) {
@ -136,7 +190,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
InterruptedException {
try {
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
super.getQueueInfo(queueName);
client.getQueueInfo(queueName);
return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo,
conf);
} catch (YarnException e) {
@ -147,7 +201,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
InterruptedException {
try {
return TypeConverter.fromYarnQueueUserAclsInfo(super
return TypeConverter.fromYarnQueueUserAclsInfo(client
.getQueueAclsInfo());
} catch (YarnException e) {
throw new IOException(e);
@ -156,7 +210,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public QueueInfo[] getQueues() throws IOException, InterruptedException {
try {
return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
} catch (YarnException e) {
throw new IOException(e);
}
@ -164,7 +218,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
try {
return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(),
return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
this.conf);
} catch (YarnException e) {
throw new IOException(e);
@ -174,7 +228,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public QueueInfo[] getChildQueues(String parent) throws IOException,
InterruptedException {
try {
return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
this.conf);
} catch (YarnException e) {
throw new IOException(e);
@ -216,4 +270,82 @@ public class ResourceMgrDelegate extends YarnClientImpl {
public ApplicationId getApplicationId() {
return applicationId;
}
@Override
public GetNewApplicationResponse getNewApplication() throws YarnException,
IOException {
return client.getNewApplication();
}
@Override
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
return client.submitApplication(appContext);
}
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
client.killApplication(applicationId);
}
@Override
public ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException {
return client.getApplicationReport(appId);
}
@Override
public List<ApplicationReport> getApplicationList() throws YarnException,
IOException {
return client.getApplicationList();
}
@Override
public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
IOException {
return client.getYarnClusterMetrics();
}
@Override
public List<NodeReport> getNodeReports() throws YarnException, IOException {
return client.getNodeReports();
}
@Override
public org.apache.hadoop.yarn.api.records.Token getRMDelegationToken(
Text renewer) throws YarnException, IOException {
return client.getRMDelegationToken(renewer);
}
@Override
public org.apache.hadoop.yarn.api.records.QueueInfo getQueueInfo(
String queueName) throws YarnException, IOException {
return client.getQueueInfo(queueName);
}
@Override
public List<org.apache.hadoop.yarn.api.records.QueueInfo> getAllQueues()
throws YarnException, IOException {
return client.getAllQueues();
}
@Override
public List<org.apache.hadoop.yarn.api.records.QueueInfo> getRootQueueInfos()
throws YarnException, IOException {
return client.getRootQueueInfos();
}
@Override
public List<org.apache.hadoop.yarn.api.records.QueueInfo> getChildQueueInfos(
String parent) throws YarnException, IOException {
return client.getChildQueueInfos(parent);
}
@Override
public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
IOException {
return client.getQueueAclsInfo();
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
@ -67,8 +68,9 @@ public class TestResourceMgrDelegate {
ResourceMgrDelegate delegate = new ResourceMgrDelegate(
new YarnConfiguration()) {
@Override
protected void serviceStart() {
this.rmClient = applicationsManager;
protected void serviceStart() throws Exception {
Assert.assertTrue(this.client instanceof YarnClientImpl);
((YarnClientImpl) this.client).setRMClient(applicationsManager);
}
};
delegate.getRootQueues();
@ -110,8 +112,9 @@ public class TestResourceMgrDelegate {
ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
new YarnConfiguration()) {
@Override
protected void serviceStart() {
this.rmClient = applicationsManager;
protected void serviceStart() throws Exception {
Assert.assertTrue(this.client instanceof YarnClientImpl);
((YarnClientImpl) this.client).setRMClient(applicationsManager);
}
};
JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -200,8 +201,9 @@ public class TestYARNRunner extends TestCase {
final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class);
ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
@Override
protected void serviceStart() {
this.rmClient = clientRMProtocol;
protected void serviceStart() throws Exception {
assertTrue(this.client instanceof YarnClientImpl);
((YarnClientImpl) this.client).setRMClient(clientRMProtocol);
}
};
/* make sure kill calls finish application master */

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -110,8 +111,9 @@ public class TestYarnClientProtocolProvider extends TestCase {
ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
new YarnConfiguration(conf)) {
@Override
protected void serviceStart() {
this.rmClient = cRMProtocol;
protected void serviceStart() throws Exception {
assertTrue(this.client instanceof YarnClientImpl);
((YarnClientImpl) this.client).setRMClient(cRMProtocol);
}
};
yrunner.setResourceMgrDelegate(rmgrDelegate);

View File

@ -166,6 +166,10 @@ Release 2.1.0-beta - UNRELEASED
YARN-610. ClientToken is no longer set in the environment of the Containers.
(Omkar Vinit Joshi via vinodkv)
YARN-834. Fixed annotations for yarn-client module, reorganized packages and
clearly differentiated *Async apis. (Arun C Murthy and Zhijie Shen via
vinodkv)
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -67,9 +67,9 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.client.NMClientAsync;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -436,17 +436,18 @@ public class ApplicationMaster {
* @throws YarnException
* @throws IOException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "unchecked" })
public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager =
AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
containerListener = new NMCallbackHandler();
nmClientAsync = new NMClientAsync(containerListener);
nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
@ -682,7 +683,7 @@ public class ApplicationMaster {
}
Container container = containers.get(containerId);
if (container != null) {
nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
container.getContainerToken());
}
}
@ -802,7 +803,7 @@ public class ApplicationMaster {
ctx.setCommands(commands);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainer(container, ctx);
nmClientAsync.startContainerAsync(container, ctx);
}
}

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;

View File

@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.util.Collection;
@ -36,12 +36,13 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public
@InterfaceStability.Unstable
@InterfaceStability.Stable
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
@ -53,7 +54,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* AMRMClient.<T>createAMRMClientContainerRequest(appAttemptId)
* }</pre>
* @param appAttemptId the appAttemptId associated with the AMRMClient
* @return the newly created AMRMClient instance.
* @return the newly create AMRMClient instance.
*/
@Public
public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient(

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -33,10 +33,11 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@InterfaceAudience.Public
@InterfaceStability.Unstable
@InterfaceStability.Stable
public abstract class NMClient extends AbstractService {
/**

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -37,10 +37,11 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceStability.Stable
public abstract class YarnClient extends AbstractService {
/**

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.async;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.annotations.VisibleForTesting;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
* and provides asynchronous updates on events such as container allocations and
* completions. It contains a thread that sends periodic heartbeats to the
* ResourceManager.
*
* It should be used by implementing a CallbackHandler:
* <pre>
* {@code
* class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
* public void onContainersAllocated(List<Container> containers) {
* [run tasks on the containers]
* }
*
* public void onContainersCompleted(List<ContainerStatus> statuses) {
* [update progress, check whether app is done]
* }
*
* public void onNodesUpdated(List<NodeReport> updated) {}
*
* public void onReboot() {}
* }
* }
* </pre>
*
* The client's lifecycle should be managed similarly to the following:
*
* <pre>
* {@code
* AMRMClientAsync asyncClient =
* createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* RegisterApplicationMasterResponse response = asyncClient
* .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
* appMasterTrackingUrl);
* asyncClient.addContainerRequest(containerRequest);
* [... wait for application to complete]
* asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
* asyncClient.stop();
* }
* </pre>
*/
@Public
@Stable
public abstract class AMRMClientAsync<T extends ContainerRequest>
extends AbstractService {
protected final AMRMClient<T> client;
protected final CallbackHandler handler;
protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync(
ApplicationAttemptId id,
int intervalMs,
CallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(id, intervalMs, callbackHandler);
}
public static <T extends ContainerRequest> AMRMClientAsync<T>
createAMRMClientAsync(
AMRMClient<T> client,
int intervalMs,
CallbackHandler callbackHandler) {
return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
}
protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
CallbackHandler callbackHandler) {
this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName());
this.client = client;
this.heartbeatIntervalMs.set(intervalMs);
this.handler = callbackHandler;
}
public void setHeartbeatInterval(int interval) {
heartbeatIntervalMs.set(interval);
}
public abstract List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability);
/**
* Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread.
* @throws YarnException
* @throws IOException
*/
public abstract RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException;
/**
* Unregister the application master. This must be called in the end.
* @param appStatus Success/Failure status of the master
* @param appMessage Diagnostics message on failure
* @param appTrackingUrl New URL to get master info
* @throws YarnException
* @throws IOException
*/
public abstract void unregisterApplicationMaster(
FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)
throws YarnException, IOException;
/**
* Request containers for resources before calling <code>allocate</code>
* @param req Resource request
*/
public abstract void addContainerRequest(T req);
/**
* Remove previous container request. The previous container request may have
* already been sent to the ResourceManager. So even after the remove request
* the app must be prepared to receive an allocation for the previous request
* even after the remove request
* @param req Resource request
*/
public abstract void removeContainerRequest(T req);
/**
* Release containers assigned by the Resource Manager. If the app cannot use
* the container or wants to give up the container then it can release them.
* The app needs to make new requests for the released resource capability if
* it still needs it. eg. it released non-local resources
* @param containerId
*/
public abstract void releaseAssignedContainer(ContainerId containerId);
/**
* Get the currently available resources in the cluster.
* A valid value is available after a call to allocate has been made
* @return Currently available resources
*/
public abstract Resource getClusterAvailableResources();
/**
* Get the current number of nodes in the cluster.
* A valid values is available after a call to allocate has been made
* @return Current number of nodes in the cluster
*/
public abstract int getClusterNodeCount();
/**
* It returns the NMToken received on allocate call. It will not communicate
* with RM to get NMTokens. On allocate call whenever we receive new token
* along with new container AMRMClientAsync will cache this NMToken per node
* manager. This map returned should be shared with any application which is
* communicating with NodeManager (ex. NMClient / NMClientAsync) using
* NMTokens. If a new NMToken is received for the same node manager
* then it will be replaced.
*/
public abstract ConcurrentMap<String, Token> getNMTokens();
public interface CallbackHandler {
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
public void onContainersCompleted(List<ContainerStatus> statuses);
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
public void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
public void onShutdownRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
public void onNodesUpdated(List<NodeReport> updatedNodes);
public float getProgress();
public void onError(Exception e);
}
}

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.async;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import com.google.common.annotations.VisibleForTesting;
/**
* <code>NMClientAsync</code> handles communication with all the NodeManagers
* and provides asynchronous updates on getting responses from them. It
* maintains a thread pool to communicate with individual NMs where a number of
* worker threads process requests to NMs by using {@link NMClientImpl}. The max
* size of the thread pool is configurable through
* {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
*
* It should be used in conjunction with a CallbackHandler. For example
*
* <pre>
* {@code
* class MyCallbackHandler implements NMClientAsync.CallbackHandler {
* public void onContainerStarted(ContainerId containerId,
* Map<String, ByteBuffer> allServiceResponse) {
* [post process after the container is started, process the response]
* }
*
* public void onContainerStatusReceived(ContainerId containerId,
* ContainerStatus containerStatus) {
* [make use of the status of the container]
* }
*
* public void onContainerStopped(ContainerId containerId) {
* [post process after the container is stopped]
* }
*
* public void onStartContainerError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
*
* public void onGetContainerStatusError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
*
* public void onStopContainerError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
* }
* }
* </pre>
*
* The client's life-cycle should be managed like the following:
*
* <pre>
* {@code
* NMClientAsync asyncClient =
* NMClientAsync.createNMClientAsync(new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* asyncClient.startContainer(container, containerLaunchContext);
* [... wait for container being started]
* asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
* container.getContainerToken());
* [... handle the status in the callback instance]
* asyncClient.stopContainer(container.getId(), container.getNodeId(),
* container.getContainerToken());
* [... wait for container being stopped]
* asyncClient.stop();
* }
* </pre>
*/
@Public
@Stable
public abstract class NMClientAsync extends AbstractService {
protected NMClient client;
protected CallbackHandler callbackHandler;
public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
return new NMClientAsyncImpl(callbackHandler);
}
protected NMClientAsync(CallbackHandler callbackHandler) {
this (NMClientAsync.class.getName(), callbackHandler);
}
protected NMClientAsync(String name, CallbackHandler callbackHandler) {
this (name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
protected NMClientAsync(String name, NMClient client,
CallbackHandler callbackHandler) {
super(name);
this.setClient(client);
this.setCallbackHandler(callbackHandler);
}
public abstract void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext);
public abstract void stopContainerAsync(
ContainerId containerId, NodeId nodeId, Token containerToken);
public abstract void getContainerStatusAsync(
ContainerId containerId, NodeId nodeId, Token containerToken);
public NMClient getClient() {
return client;
}
public void setClient(NMClient client) {
this.client = client;
}
public CallbackHandler getCallbackHandler() {
return callbackHandler;
}
public void setCallbackHandler(CallbackHandler callbackHandler) {
this.callbackHandler = callbackHandler;
}
/**
* <p>
* The callback interface needs to be implemented by {@link NMClientAsync}
* users. The APIs are called when responses from <code>NodeManager</code> are
* available.
* </p>
*
* <p>
* Once a callback happens, the users can chose to act on it in blocking or
* non-blocking manner. If the action on callback is done in a blocking
* manner, some of the threads performing requests on NodeManagers may get
* blocked depending on how many threads in the pool are busy.
* </p>
*
* <p>
* The implementation of the callback function should not throw the
* unexpected exception. Otherwise, {@link NMClientAsync} will just
* catch, log and then ignore it.
* </p>
*/
public static interface CallbackHandler {
/**
* The API is called when <code>NodeManager</code> responds to indicate its
* acceptance of the starting container request
* @param containerId the Id of the container
* @param allServiceResponse a Map between the auxiliary service names and
* their outputs
*/
void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse);
/**
* The API is called when <code>NodeManager</code> responds with the status
* of the container
* @param containerId the Id of the container
* @param containerStatus the status of the container
*/
void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus);
/**
* The API is called when <code>NodeManager</code> responds to indicate the
* container is stopped.
* @param containerId the Id of the container
*/
void onContainerStopped(ContainerId containerId);
/**
* The API is called when an exception is raised in the process of
* starting a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onStartContainerError(ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* querying the status of a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onGetContainerStatusError(ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* stopping a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onStopContainerError(ContainerId containerId, Throwable t);
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.async.impl;
import java.io.IOException;
import java.util.Collection;
@ -24,15 +24,12 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -44,65 +41,24 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
* and provides asynchronous updates on events such as container allocations and
* completions. It contains a thread that sends periodic heartbeats to the
* ResourceManager.
*
* It should be used by implementing a CallbackHandler:
* <pre>
* {@code
* class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
* public void onContainersAllocated(List<Container> containers) {
* [run tasks on the containers]
* }
*
* public void onContainersCompleted(List<ContainerStatus> statuses) {
* [update progress, check whether app is done]
* }
*
* public void onNodesUpdated(List<NodeReport> updated) {}
*
* public void onReboot() {}
* }
* }
* </pre>
*
* The client's lifecycle should be managed similarly to the following:
*
* <pre>
* {@code
* AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* RegisterApplicationMasterResponse response = asyncClient
* .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
* appMasterTrackingUrl);
* asyncClient.addContainerRequest(containerRequest);
* [... wait for application to complete]
* asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
* asyncClient.stop();
* }
* </pre>
*/
@Private
@Unstable
@Evolving
public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
public class AMRMClientAsyncImpl<T extends ContainerRequest>
extends AMRMClientAsync<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class);
private final AMRMClient<T> client;
private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread;
private final CallbackHandler handler;
private final BlockingQueue<AllocateResponse> responseQueue;
@ -113,19 +69,16 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
private volatile Exception savedException;
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs,
CallbackHandler callbackHandler) {
this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
}
@Private
@VisibleForTesting
protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName());
this.client = client;
this.heartbeatIntervalMs.set(intervalMs);
handler = callbackHandler;
super(client, intervalMs, callbackHandler);
heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<AllocateResponse>();
@ -386,38 +339,4 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
}
}
}
public interface CallbackHandler {
/**
* Called when the ResourceManager responds to a heartbeat with completed
* containers. If the response contains both completed containers and
* allocated containers, this will be called before containersAllocated.
*/
public void onContainersCompleted(List<ContainerStatus> statuses);
/**
* Called when the ResourceManager responds to a heartbeat with allocated
* containers. If the response containers both completed containers and
* allocated containers, this will be called after containersCompleted.
*/
public void onContainersAllocated(List<Container> containers);
/**
* Called when the ResourceManager wants the ApplicationMaster to shutdown
* for being out of sync etc. The ApplicationMaster should not unregister
* with the RM unless the ApplicationMaster wants to be the last attempt.
*/
public void onShutdownRequest();
/**
* Called when nodes tracked by the ResourceManager have changed in health,
* availability etc.
*/
public void onNodesUpdated(List<NodeReport> updatedNodes);
public float getProgress();
public void onError(Exception e);
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.async.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -39,16 +39,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
@ -63,75 +64,11 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* <code>NMClientAsync</code> handles communication with all the NodeManagers
* and provides asynchronous updates on getting responses from them. It
* maintains a thread pool to communicate with individual NMs where a number of
* worker threads process requests to NMs by using {@link NMClientImpl}. The max
* size of the thread pool is configurable through
* {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
*
* It should be used in conjunction with a CallbackHandler. For example
*
* <pre>
* {@code
* class MyCallbackHandler implements NMClientAsync.CallbackHandler {
* public void onContainerStarted(ContainerId containerId,
* Map<String, ByteBuffer> allServiceResponse) {
* [post process after the container is started, process the response]
* }
*
* public void onContainerStatusReceived(ContainerId containerId,
* ContainerStatus containerStatus) {
* [make use of the status of the container]
* }
*
* public void onContainerStopped(ContainerId containerId) {
* [post process after the container is stopped]
* }
*
* public void onStartContainerError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
*
* public void onGetContainerStatusError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
*
* public void onStopContainerError(
* ContainerId containerId, Throwable t) {
* [handle the raised exception]
* }
* }
* }
* </pre>
*
* The client's life-cycle should be managed like the following:
*
* <pre>
* {@code
* NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
* asyncClient.init(conf);
* asyncClient.start();
* asyncClient.startContainer(container, containerLaunchContext);
* [... wait for container being started]
* asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
* container.getContainerToken());
* [... handle the status in the callback instance]
* asyncClient.stopContainer(container.getId(), container.getNodeId(),
* container.getContainerToken());
* [... wait for container being stopped]
* asyncClient.stop();
* }
* </pre>
*/
@Private
@Unstable
@Evolving
public class NMClientAsync extends AbstractService {
public class NMClientAsyncImpl extends NMClientAsync {
private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
protected static final int INITIAL_THREAD_POOL_SIZE = 10;
@ -142,25 +79,22 @@ public class NMClientAsync extends AbstractService {
protected BlockingQueue<ContainerEvent> events =
new LinkedBlockingQueue<ContainerEvent>();
protected NMClient client;
protected CallbackHandler callbackHandler;
protected ConcurrentMap<ContainerId, StatefulContainer> containers =
new ConcurrentHashMap<ContainerId, StatefulContainer>();
public NMClientAsync(CallbackHandler callbackHandler) {
this (NMClientAsync.class.getName(), callbackHandler);
public NMClientAsyncImpl(CallbackHandler callbackHandler) {
this (NMClientAsyncImpl.class.getName(), callbackHandler);
}
public NMClientAsync(String name, CallbackHandler callbackHandler) {
public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
this (name, new NMClientImpl(), callbackHandler);
}
@Private
@VisibleForTesting
protected NMClientAsync(String name, NMClient client,
protected NMClientAsyncImpl(String name, NMClient client,
CallbackHandler callbackHandler) {
super(name);
super(name, client, callbackHandler);
this.client = client;
this.callbackHandler = callbackHandler;
}
@ -268,7 +202,7 @@ public class NMClientAsync extends AbstractService {
// If NMClientImpl doesn't stop running containers, the states doesn't
// need to be cleared.
if (!(client instanceof NMClientImpl) ||
((NMClientImpl) client).cleanupRunningContainers.get()) {
((NMClientImpl) client).getCleanupRunningContainers().get()) {
if (containers != null) {
containers.clear();
}
@ -278,7 +212,7 @@ public class NMClientAsync extends AbstractService {
super.serviceStop();
}
public void startContainer(
public void startContainerAsync(
Container container, ContainerLaunchContext containerLaunchContext) {
if (containers.putIfAbsent(container.getId(),
new StatefulContainer(this, container.getId())) != null) {
@ -295,7 +229,7 @@ public class NMClientAsync extends AbstractService {
}
}
public void stopContainer(ContainerId containerId, NodeId nodeId,
public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
Token containerToken) {
if (containers.get(containerId) == null) {
callbackHandler.onStopContainerError(containerId,
@ -312,7 +246,7 @@ public class NMClientAsync extends AbstractService {
}
}
public void getContainerStatus(ContainerId containerId, NodeId nodeId,
public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
Token containerToken) {
try {
events.put(new ContainerEvent(containerId, nodeId, containerToken,
@ -443,10 +377,10 @@ public class NMClientAsync extends AbstractService {
}
assert scEvent != null;
Map<String, ByteBuffer> allServiceResponse =
container.nmClientAsync.client.startContainer(
container.nmClientAsync.getClient().startContainer(
scEvent.getContainer(), scEvent.getContainerLaunchContext());
try {
container.nmClientAsync.callbackHandler.onContainerStarted(
container.nmClientAsync.getCallbackHandler().onContainerStarted(
containerId, allServiceResponse);
} catch (Throwable thr) {
// Don't process user created unchecked exception
@ -466,7 +400,7 @@ public class NMClientAsync extends AbstractService {
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
container.nmClientAsync.callbackHandler.onStartContainerError(
container.nmClientAsync.getCallbackHandler().onStartContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
@ -487,10 +421,10 @@ public class NMClientAsync extends AbstractService {
StatefulContainer container, ContainerEvent event) {
ContainerId containerId = event.getContainerId();
try {
container.nmClientAsync.client.stopContainer(
container.nmClientAsync.getClient().stopContainer(
containerId, event.getNodeId(), event.getContainerToken());
try {
container.nmClientAsync.callbackHandler.onContainerStopped(
container.nmClientAsync.getCallbackHandler().onContainerStopped(
event.getContainerId());
} catch (Throwable thr) {
// Don't process user created unchecked exception
@ -510,7 +444,7 @@ public class NMClientAsync extends AbstractService {
private ContainerState onExceptionRaised(StatefulContainer container,
ContainerEvent event, Throwable t) {
try {
container.nmClientAsync.callbackHandler.onStopContainerError(
container.nmClientAsync.getCallbackHandler().onStopContainerError(
event.getContainerId(), t);
} catch (Throwable thr) {
// Don't process user created unchecked exception
@ -530,7 +464,7 @@ public class NMClientAsync extends AbstractService {
@Override
public void transition(StatefulContainer container, ContainerEvent event) {
try {
container.nmClientAsync.callbackHandler.onStartContainerError(
container.nmClientAsync.getCallbackHandler().onStartContainerError(
event.getContainerId(),
RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
} catch (Throwable thr) {
@ -641,80 +575,4 @@ public class NMClientAsync extends AbstractService {
}
}
/**
* <p>
* The callback interface needs to be implemented by {@link NMClientAsync}
* users. The APIs are called when responses from <code>NodeManager</code> are
* available.
* </p>
*
* <p>
* Once a callback happens, the users can chose to act on it in blocking or
* non-blocking manner. If the action on callback is done in a blocking
* manner, some of the threads performing requests on NodeManagers may get
* blocked depending on how many threads in the pool are busy.
* </p>
*
* <p>
* The implementation of the callback function should not throw the
* unexpected exception. Otherwise, {@link NMClientAsync} will just
* catch, log and then ignore it.
* </p>
*/
public static interface CallbackHandler {
/**
* The API is called when <code>NodeManager</code> responds to indicate its
* acceptance of the starting container request
* @param containerId the Id of the container
* @param allServiceResponse a Map between the auxiliary service names and
* their outputs
*/
void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse);
/**
* The API is called when <code>NodeManager</code> responds with the status
* of the container
* @param containerId the Id of the container
* @param containerStatus the status of the container
*/
void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus);
/**
* The API is called when <code>NodeManager</code> responds to indicate the
* container is stopped.
* @param containerId the Id of the container
*/
void onContainerStopped(ContainerId containerId);
/**
* The API is called when an exception is raised in the process of
* starting a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onStartContainerError(ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* querying the status of a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onGetContainerStatusError(ContainerId containerId, Throwable t);
/**
* The API is called when an exception is raised in the process of
* stopping a container
*
* @param containerId the Id of the container
* @param t the raised exception
*/
void onStopContainerError(ContainerId containerId, Throwable t);
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.client.api.async.impl;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.client.api.async;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -57,7 +57,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -71,6 +72,7 @@ import com.google.common.base.Joiner;
// TODO check inputs for null etc. YARN-654
@Private
@Unstable
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
@ -312,64 +314,64 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
@Override
public synchronized void addContainerRequest(T req) {
Set<String> allRacks = new HashSet<String>();
if (req.racks != null) {
allRacks.addAll(req.racks);
if(req.racks.size() != allRacks.size()) {
if (req.getRacks() != null) {
allRacks.addAll(req.getRacks());
if(req.getRacks().size() != allRacks.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: "
+ joiner.join(req.racks));
+ joiner.join(req.getRacks()));
}
}
allRacks.addAll(resolveRacks(req.nodes));
allRacks.addAll(resolveRacks(req.getNodes()));
if (req.nodes != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
if(dedupedNodes.size() != req.nodes.size()) {
if (req.getNodes() != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
if(dedupedNodes.size() != req.getNodes().size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate nodes: "
+ joiner.join(req.nodes));
+ joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
addResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
addResourceRequest(req.getPriority(), node, req.getCapability(),
req.getContainerCount(), req);
}
}
for (String rack : allRacks) {
addResourceRequest(req.priority, rack, req.capability,
req.containerCount, req);
addResourceRequest(req.getPriority(), rack, req.getCapability(),
req.getContainerCount(), req);
}
// Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount, req);
addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
req.getContainerCount(), req);
}
@Override
public synchronized void removeContainerRequest(T req) {
Set<String> allRacks = new HashSet<String>();
if (req.racks != null) {
allRacks.addAll(req.racks);
if (req.getRacks() != null) {
allRacks.addAll(req.getRacks());
}
allRacks.addAll(resolveRacks(req.nodes));
allRacks.addAll(resolveRacks(req.getNodes()));
// Update resource requests
if (req.nodes != null) {
for (String node : new HashSet<String>(req.nodes)) {
decResourceRequest(req.priority, node, req.capability,
req.containerCount, req);
if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) {
decResourceRequest(req.getPriority(), node, req.getCapability(),
req.getContainerCount(), req);
}
}
for (String rack : allRacks) {
decResourceRequest(req.priority, rack, req.capability,
req.containerCount, req);
decResourceRequest(req.getPriority(), rack, req.getCapability(),
req.getContainerCount(), req);
}
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount, req);
decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
req.getContainerCount(), req);
}
@Override

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -76,6 +79,8 @@ import org.apache.hadoop.yarn.util.Records;
* {@link #stopContainer}.
* </p>
*/
@Private
@Unstable
public class NMClientImpl extends NMClient {
private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
@ -86,7 +91,7 @@ public class NMClientImpl extends NMClient {
new ConcurrentHashMap<ContainerId, StartedContainer>();
//enabled by default
protected final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
public NMClientImpl() {
super(NMClientImpl.class.getName());
@ -100,7 +105,7 @@ public class NMClientImpl extends NMClient {
protected void serviceStop() throws Exception {
// Usually, started-containers are stopped when this client stops. Unless
// the flag cleanupRunningContainers is set to false.
if (cleanupRunningContainers.get()) {
if (getCleanupRunningContainers().get()) {
cleanupRunningContainers();
}
super.serviceStop();
@ -126,7 +131,7 @@ public class NMClientImpl extends NMClient {
@Override
public void cleanupRunningContainersOnStop(boolean enabled) {
cleanupRunningContainers.set(enabled);
getCleanupRunningContainers().set(enabled);
}
protected static class StartedContainer {
@ -171,7 +176,7 @@ public class NMClientImpl extends NMClient {
}
@Override
protected void serviceStart() throws Exception {
protected synchronized void serviceStart() throws Exception {
final YarnRPC rpc = YarnRPC.create(getConfig());
final InetSocketAddress containerAddress =
@ -199,7 +204,7 @@ public class NMClientImpl extends NMClient {
}
@Override
protected void serviceStop() throws Exception {
protected synchronized void serviceStop() throws Exception {
if (this.containerManager != null) {
RPC.stopProxy(this.containerManager);
@ -397,4 +402,8 @@ public class NMClientImpl extends NMClient {
return startedContainers.get(containerId);
}
public AtomicBoolean getCleanupRunningContainers() {
return cleanupRunningContainers;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -25,8 +25,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
@ -56,13 +56,16 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
@InterfaceAudience.Public
@InterfaceStability.Evolving
import com.google.common.annotations.VisibleForTesting;
@Private
@Unstable
public class YarnClientImpl extends YarnClient {
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
@ -304,4 +307,10 @@ public class YarnClientImpl extends YarnClient {
}
}
}
@Private
@VisibleForTesting
public void setRMClient(ApplicationClientProtocol rmClient) {
this.rmClient = rmClient;
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.client.api;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -27,12 +27,16 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@Private
@Unstable
public class ApplicationCLI extends YarnCLI {
private static final String APPLICATIONS_PATTERN =
"%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +

View File

@ -28,12 +28,16 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@Private
@Unstable
public class NodeCLI extends YarnCLI {
private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" +
System.getProperty("line.separator");

View File

@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ipc.RemoteException;
@ -42,6 +44,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@Private
@Unstable
public class RMAdminCLI extends Configured implements Tool {
private final RecordFactory recordFactory =

View File

@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.client.cli;
import java.io.PrintStream;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@Private
@Unstable
public abstract class YarnCLI extends Configured implements Tool {
public static final String STATUS_CMD = "status";

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.client.cli;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.async.impl;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@ -46,7 +46,10 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@ -105,7 +108,7 @@ public class TestAMRMClientAsync {
});
AMRMClientAsync<ContainerRequest> asyncClient =
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null);
@ -160,7 +163,7 @@ public class TestAMRMClientAsync {
when(client.allocate(anyFloat())).thenThrow(mockException);
AMRMClientAsync<ContainerRequest> asyncClient =
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
@ -195,7 +198,7 @@ public class TestAMRMClientAsync {
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
AMRMClientAsync<ContainerRequest> asyncClient =
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.async.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -62,7 +65,7 @@ public class TestNMClientAsync {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private NMClientAsync asyncClient;
private NMClientAsyncImpl asyncClient;
private NodeId nodeId;
private Token containerToken;
@ -71,7 +74,7 @@ public class TestNMClientAsync {
ServiceOperations.stop(asyncClient);
}
@Test (timeout = 30000)
@Test (timeout = 10000)
public void testNMClientAsync() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
@ -89,40 +92,42 @@ public class TestNMClientAsync {
for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
if (i == expectedSuccess) {
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isAllSuccessCallsExecuted()) {
Thread.sleep(10);
}
asyncClient.client = mockNMClient(1);
asyncClient.setClient(mockNMClient(1));
}
Container container = mockContainer(i);
ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
asyncClient.startContainer(container, clc);
asyncClient.startContainerAsync(container, clc);
}
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isStartAndQueryFailureCallsExecuted()) {
Thread.sleep(10);
}
asyncClient.client = mockNMClient(2);
((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
asyncClient.setClient(mockNMClient(2));
((TestCallbackHandler1) asyncClient.getCallbackHandler()).path = false;
for (int i = 0; i < expectedFailure; ++i) {
Container container = mockContainer(
expectedSuccess + expectedFailure + i);
ContainerLaunchContext clc =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
asyncClient.startContainer(container, clc);
asyncClient.startContainerAsync(container, clc);
}
while (!((TestCallbackHandler1) asyncClient.callbackHandler)
while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
.isStopFailureCallsExecuted()) {
Thread.sleep(10);
}
for (String errorMsg :
((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
((TestCallbackHandler1) asyncClient.getCallbackHandler())
.errorMsgs) {
System.out.println(errorMsg);
}
Assert.assertEquals("Error occurs in CallbackHandler", 0,
((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
((TestCallbackHandler1) asyncClient.getCallbackHandler())
.errorMsgs.size());
for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
System.out.println(errorMsg);
}
@ -141,7 +146,7 @@ public class TestNMClientAsync {
asyncClient.threadPool.isShutdown());
}
private class MockNMClientAsync1 extends NMClientAsync {
private class MockNMClientAsync1 extends NMClientAsyncImpl {
private Set<String> errorMsgs =
Collections.synchronizedSet(new HashSet<String>());
@ -227,10 +232,10 @@ public class TestNMClientAsync {
actualStartSuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.getContainerStatus(containerId, nodeId, containerToken);
asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
} else {
// move on to the following failure tests
asyncClient.stopContainer(containerId, nodeId, containerToken);
asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
}
// Shouldn't crash the test thread
@ -248,7 +253,7 @@ public class TestNMClientAsync {
actualQuerySuccess.addAndGet(1);
actualQuerySuccessArray.set(containerId.getId(), 1);
// move on to the following success tests
asyncClient.stopContainer(containerId, nodeId, containerToken);
asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@ -285,7 +290,7 @@ public class TestNMClientAsync {
actualStartFailure.addAndGet(1);
actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
// move on to the following failure tests
asyncClient.getContainerStatus(containerId, nodeId, containerToken);
asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
// Shouldn't crash the test thread
throw new RuntimeException("Ignorable Exception");
@ -426,22 +431,22 @@ public class TestNMClientAsync {
Thread t = new Thread() {
@Override
public void run() {
asyncClient.startContainer(container, clc);
asyncClient.startContainerAsync(container, clc);
}
};
t.start();
barrierA.await();
asyncClient.stopContainer(container.getId(), container.getNodeId(),
asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
container.getContainerToken());
barrierC.await();
Assert.assertFalse("Starting and stopping should be out of order",
((TestCallbackHandler2) asyncClient.callbackHandler)
((TestCallbackHandler2) asyncClient.getCallbackHandler())
.exceptionOccurred.get());
}
private class MockNMClientAsync2 extends NMClientAsync {
private class MockNMClientAsync2 extends NMClientAsyncImpl {
private CyclicBarrier barrierA;
private CyclicBarrier barrierB;
@ -510,7 +515,7 @@ public class TestNMClientAsync {
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
if (!t.getMessage().equals(NMClientAsync.StatefulContainer
if (!t.getMessage().equals(NMClientAsyncImpl.StatefulContainer
.OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
exceptionOccurred.set(true);
return;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@ -57,8 +57,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import java.util.Arrays;
import java.util.List;
@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.junit.Test;
import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
import static org.junit.Assert.assertEquals;
public class TestAMRMClientContainerRequest {
@ -72,8 +73,8 @@ public class TestAMRMClientContainerRequest {
private void verifyResourceRequestLocation(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location) {
ResourceRequest ask = client.remoteRequestsTable.get(request.priority)
.get(location).get(request.capability).remoteRequest;
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
.get(location).get(request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(request.getContainerCount(), ask.getNumContainers());
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -51,7 +51,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -164,9 +170,9 @@ public class TestNMClient {
// leave one unclosed
assertEquals(1, nmClient.startedContainers.size());
// default true
assertTrue(nmClient.cleanupRunningContainers.get());
assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.cleanupRunningContainersOnStop(stopContainers);
assertEquals(stopContainers, nmClient.cleanupRunningContainers.get());
assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}
@ -201,7 +207,7 @@ public class TestNMClient {
// stop the running containers on close
assertFalse(nmClient.startedContainers.isEmpty());
nmClient.cleanupRunningContainersOnStop(true);
assertTrue(nmClient.cleanupRunningContainers.get());
assertTrue(nmClient.getCleanupRunningContainers().get());
nmClient.stop();
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Before;
import org.junit.Test;