Merge -c 1493631 from trunk to branch-2 to fix YARN-824. Added static factory methods to hadoop-yarn-client interfaces. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1493632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-06-17 03:20:22 +00:00
parent 59eab1f4c2
commit bb43d35c90
14 changed files with 164 additions and 72 deletions

View File

@ -149,6 +149,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-831. Removed minimum resource from GetNewApplicationResponse as a YARN-831. Removed minimum resource from GetNewApplicationResponse as a
follow-up to YARN-787. (Jian He via acmurthy) follow-up to YARN-787. (Jian He via acmurthy)
YARN-824. Added static factory methods to hadoop-yarn-client interfaces.
(Jian He via acmurthy)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -99,13 +99,13 @@ import org.apache.hadoop.yarn.util.Records;
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class Client extends YarnClientImpl { public class Client {
private static final Log LOG = LogFactory.getLog(Client.class); private static final Log LOG = LogFactory.getLog(Client.class);
// Configuration // Configuration
private Configuration conf; private Configuration conf;
private YarnClient yarnClient;
// Application master specific info to register a new Application with RM/ASM // Application master specific info to register a new Application with RM/ASM
private String appName = ""; private String appName = "";
// App master priority // App master priority
@ -186,9 +186,10 @@ public class Client extends YarnClientImpl {
/** /**
*/ */
public Client(Configuration conf) throws Exception { public Client(Configuration conf) throws Exception {
super();
this.conf = conf; this.conf = conf;
init(conf); yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
opts = new Options(); opts = new Options();
opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("priority", true, "Application Priority. Default 0");
@ -317,13 +318,13 @@ public class Client extends YarnClientImpl {
public boolean run() throws IOException, YarnException { public boolean run() throws IOException, YarnException {
LOG.info("Running Client"); LOG.info("Running Client");
start(); yarnClient.start();
YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics(); YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM" LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
List<NodeReport> clusterNodeReports = super.getNodeReports(); List<NodeReport> clusterNodeReports = yarnClient.getNodeReports();
LOG.info("Got Cluster node info from ASM"); LOG.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) { for (NodeReport node : clusterNodeReports) {
LOG.info("Got node report from ASM for" LOG.info("Got node report from ASM for"
@ -333,7 +334,7 @@ public class Client extends YarnClientImpl {
+ ", nodeNumContainers" + node.getNumContainers()); + ", nodeNumContainers" + node.getNumContainers());
} }
QueueInfo queueInfo = super.getQueueInfo(this.amQueue); QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
LOG.info("Queue info" LOG.info("Queue info"
+ ", queueName=" + queueInfo.getQueueName() + ", queueName=" + queueInfo.getQueueName()
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
@ -341,7 +342,7 @@ public class Client extends YarnClientImpl {
+ ", queueApplicationCount=" + queueInfo.getApplications().size() + ", queueApplicationCount=" + queueInfo.getApplications().size()
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo(); List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
for (QueueUserACLInfo aclInfo : listAclInfo) { for (QueueUserACLInfo aclInfo : listAclInfo) {
for (QueueACL userAcl : aclInfo.getUserAcls()) { for (QueueACL userAcl : aclInfo.getUserAcls()) {
LOG.info("User ACL Info for Queue" LOG.info("User ACL Info for Queue"
@ -351,7 +352,7 @@ public class Client extends YarnClientImpl {
} }
// Get a new application id // Get a new application id
GetNewApplicationResponse newApp = super.getNewApplication(); GetNewApplicationResponse newApp = yarnClient.getNewApplication();
ApplicationId appId = newApp.getApplicationId(); ApplicationId appId = newApp.getApplicationId();
// TODO get min/max resource capabilities from RM and change memory ask if needed // TODO get min/max resource capabilities from RM and change memory ask if needed
@ -564,7 +565,7 @@ public class Client extends YarnClientImpl {
// or an exception thrown to denote some form of a failure // or an exception thrown to denote some form of a failure
LOG.info("Submitting application to ASM"); LOG.info("Submitting application to ASM");
super.submitApplication(appContext); yarnClient.submitApplication(appContext);
// TODO // TODO
// Try submitting the same request again // Try submitting the same request again
@ -596,7 +597,7 @@ public class Client extends YarnClientImpl {
} }
// Get application report for the appId we are interested in // Get application report for the appId we are interested in
ApplicationReport report = super.getApplicationReport(appId); ApplicationReport report = yarnClient.getApplicationReport(appId);
LOG.info("Got application report from ASM for" LOG.info("Got application report from ASM for"
+ ", appId=" + appId.getId() + ", appId=" + appId.getId()
@ -656,7 +657,7 @@ public class Client extends YarnClientImpl {
// Response can be ignored as it is non-null on success or // Response can be ignored as it is non-null on success or
// throws an exception in case of failures // throws an exception in case of failures
super.killApplication(appId); yarnClient.killApplication(appId);
} }
} }

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -72,7 +73,7 @@ public class UnmanagedAMLauncher {
private Configuration conf; private Configuration conf;
// Handle to talk to the Resource Manager/Applications Manager // Handle to talk to the Resource Manager/Applications Manager
private YarnClientImpl rmClient; private YarnClient rmClient;
// Application master specific info to register a new Application with RM/ASM // Application master specific info to register a new Application with RM/ASM
private String appName = ""; private String appName = "";
@ -160,7 +161,7 @@ public class UnmanagedAMLauncher {
} }
YarnConfiguration yarnConf = new YarnConfiguration(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf);
rmClient = new YarnClientImpl(); rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf); rmClient.init(yarnConf);
return true; return true;

View File

@ -24,22 +24,48 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service { public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
/**
* Create a new instance of AMRMClient.
* For usage:
* <pre>
* {@code
* AMRMClient.<T>createAMRMClientContainerRequest(appAttemptId)
* }</pre>
* @param appAttemptId the appAttemptId associated with the AMRMClient
* @return the newly create AMRMClient instance.
*/
@Public
public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient(
ApplicationAttemptId appAttemptId) {
AMRMClient<T> client = new AMRMClientImpl<T>(appAttemptId);
return client;
}
@Private
protected AMRMClient(String name) {
super(name);
}
/** /**
* Object to represent container request for resources. Scheduler * Object to represent container request for resources. Scheduler
@ -132,7 +158,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
public RegisterApplicationMasterResponse public abstract RegisterApplicationMasterResponse
registerApplicationMaster(String appHostName, registerApplicationMaster(String appHostName,
int appHostPort, int appHostPort,
String appTrackingUrl) String appTrackingUrl)
@ -153,7 +179,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
public AllocateResponse allocate(float progressIndicator) public abstract AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException; throws YarnException, IOException;
/** /**
@ -164,7 +190,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appMessage,
String appTrackingUrl) String appTrackingUrl)
throws YarnException, IOException; throws YarnException, IOException;
@ -173,7 +199,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* Request containers for resources before calling <code>allocate</code> * Request containers for resources before calling <code>allocate</code>
* @param req Resource request * @param req Resource request
*/ */
public void addContainerRequest(T req); public abstract void addContainerRequest(T req);
/** /**
* Remove previous container request. The previous container request may have * Remove previous container request. The previous container request may have
@ -182,7 +208,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* even after the remove request * even after the remove request
* @param req Resource request * @param req Resource request
*/ */
public void removeContainerRequest(T req); public abstract void removeContainerRequest(T req);
/** /**
* Release containers assigned by the Resource Manager. If the app cannot use * Release containers assigned by the Resource Manager. If the app cannot use
@ -191,21 +217,21 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* it still needs it. eg. it released non-local resources * it still needs it. eg. it released non-local resources
* @param containerId * @param containerId
*/ */
public void releaseAssignedContainer(ContainerId containerId); public abstract void releaseAssignedContainer(ContainerId containerId);
/** /**
* Get the currently available resources in the cluster. * Get the currently available resources in the cluster.
* A valid value is available after a call to allocate has been made * A valid value is available after a call to allocate has been made
* @return Currently available resources * @return Currently available resources
*/ */
public Resource getClusterAvailableResources(); public abstract Resource getClusterAvailableResources();
/** /**
* Get the current number of nodes in the cluster. * Get the current number of nodes in the cluster.
* A valid values is available after a call to allocate has been made * A valid values is available after a call to allocate has been made
* @return Current number of nodes in the cluster * @return Current number of nodes in the cluster
*/ */
public int getClusterNodeCount(); public abstract int getClusterNodeCount();
/** /**
* Get outstanding <code>StoredContainerRequest</code>s matching the given * Get outstanding <code>StoredContainerRequest</code>s matching the given
@ -218,7 +244,7 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* collection, requests will be returned in the same order as they were added. * collection, requests will be returned in the same order as they were added.
* @return Collection of request matching the parameters * @return Collection of request matching the parameters
*/ */
public List<? extends Collection<T>> getMatchingRequests( public abstract List<? extends Collection<T>> getMatchingRequests(
Priority priority, Priority priority,
String resourceName, String resourceName,
Resource capability); Resource capability);
@ -231,5 +257,5 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
* communicating with NodeManager (ex. NMClient) using NMTokens. If a new * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
* NMToken is received for the same node manager then it will be replaced. * NMToken is received for the same node manager then it will be replaced.
*/ */
public ConcurrentMap<String, Token> getNMTokens(); public abstract ConcurrentMap<String, Token> getNMTokens();
} }

View File

@ -120,7 +120,7 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
@Private @Private
@VisibleForTesting @VisibleForTesting
public AMRMClientAsync(AMRMClient<T> client, int intervalMs, protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName()); super(AMRMClientAsync.class.getName());
this.client = client; this.client = client;

View File

@ -64,18 +64,15 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.base.Joiner;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
// TODO check inputs for null etc. YARN-654 // TODO check inputs for null etc. YARN-654
@Unstable @Unstable
public class AMRMClientImpl<T extends ContainerRequest> public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
extends AbstractService implements AMRMClient<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);

View File

@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -31,11 +33,34 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.AbstractService;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface NMClient extends Service { public abstract class NMClient extends AbstractService {
/**
* Create a new instance of NMClient.
*/
@Public
public static NMClient createNMClient() {
NMClient client = new NMClientImpl();
return client;
}
/**
* Create a new instance of NMClient.
*/
@Public
public static NMClient createNMClient(String name) {
NMClient client = new NMClientImpl(name);
return client;
}
@Private
protected NMClient(String name) {
super(name);
}
/** /**
* <p>Start an allocated container.</p> * <p>Start an allocated container.</p>
@ -54,7 +79,7 @@ public interface NMClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
Map<String, ByteBuffer> startContainer(Container container, public abstract Map<String, ByteBuffer> startContainer(Container container,
ContainerLaunchContext containerLaunchContext) ContainerLaunchContext containerLaunchContext)
throws YarnException, IOException; throws YarnException, IOException;
@ -68,7 +93,7 @@ public interface NMClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
void stopContainer(ContainerId containerId, NodeId nodeId, public abstract void stopContainer(ContainerId containerId, NodeId nodeId,
Token containerToken) throws YarnException, IOException; Token containerToken) throws YarnException, IOException;
/** /**
@ -82,7 +107,7 @@ public interface NMClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId, public abstract ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
Token containerToken) throws YarnException, IOException; Token containerToken) throws YarnException, IOException;
/** /**
@ -92,6 +117,6 @@ public interface NMClient extends Service {
* *
* @param enabled whether the feature is enabled or not * @param enabled whether the feature is enabled or not
*/ */
void cleanupRunningContainersOnStop(boolean enabled); public abstract void cleanupRunningContainersOnStop(boolean enabled);
} }

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -76,7 +76,7 @@ import org.apache.hadoop.yarn.util.Records;
* {@link #stopContainer}. * {@link #stopContainer}.
* </p> * </p>
*/ */
public class NMClientImpl extends AbstractService implements NMClient { public class NMClientImpl extends NMClient {
private static final Log LOG = LogFactory.getLog(NMClientImpl.class); private static final Log LOG = LogFactory.getLog(NMClientImpl.class);

View File

@ -19,9 +19,12 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
@ -34,11 +37,44 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.AbstractService;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface YarnClient extends Service { public abstract class YarnClient extends AbstractService {
/**
* Create a new instance of YarnClient.
*/
@Public
public static YarnClient createYarnClient() {
YarnClient client = new YarnClientImpl();
return client;
}
/**
* Create a new instance of YarnClient.
*/
@Public
public static YarnClient createYarnClient(InetSocketAddress rmAddress) {
YarnClient client = new YarnClientImpl(rmAddress);
return client;
}
/**
* Create a new instance of YarnClient.
*/
@Public
public static YarnClient createYarnClient(String name,
InetSocketAddress rmAddress) {
YarnClient client = new YarnClientImpl(name, rmAddress);
return client;
}
@Private
protected YarnClient(String name) {
super(name);
}
/** /**
* <p> * <p>
@ -61,7 +97,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
GetNewApplicationResponse getNewApplication() throws YarnException, public abstract GetNewApplicationResponse getNewApplication() throws YarnException,
IOException; IOException;
/** /**
@ -79,7 +115,7 @@ public interface YarnClient extends Service {
* @throws IOException * @throws IOException
* @see #getNewApplication() * @see #getNewApplication()
*/ */
ApplicationId submitApplication(ApplicationSubmissionContext appContext) public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException; throws YarnException, IOException;
/** /**
@ -95,7 +131,7 @@ public interface YarnClient extends Service {
* @throws IOException * @throws IOException
* @see #getQueueAclsInfo() * @see #getQueueAclsInfo()
*/ */
void killApplication(ApplicationId applicationId) throws YarnException, public abstract void killApplication(ApplicationId applicationId) throws YarnException,
IOException; IOException;
/** /**
@ -128,7 +164,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
ApplicationReport getApplicationReport(ApplicationId appId) public abstract ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException; throws YarnException, IOException;
/** /**
@ -146,7 +182,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
List<ApplicationReport> getApplicationList() throws YarnException, public abstract List<ApplicationReport> getApplicationList() throws YarnException,
IOException; IOException;
/** /**
@ -158,7 +194,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
YarnClusterMetrics getYarnClusterMetrics() throws YarnException, public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
IOException; IOException;
/** /**
@ -170,7 +206,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
List<NodeReport> getNodeReports() throws YarnException, IOException; public abstract List<NodeReport> getNodeReports() throws YarnException, IOException;
/** /**
* <p> * <p>
@ -184,7 +220,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
Token getRMDelegationToken(Text renewer) public abstract Token getRMDelegationToken(Text renewer)
throws YarnException, IOException; throws YarnException, IOException;
/** /**
@ -200,7 +236,7 @@ public interface YarnClient extends Service {
* access-control restrictions. * access-control restrictions.
* @throws IOException * @throws IOException
*/ */
QueueInfo getQueueInfo(String queueName) throws YarnException, public abstract QueueInfo getQueueInfo(String queueName) throws YarnException,
IOException; IOException;
/** /**
@ -213,7 +249,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
List<QueueInfo> getAllQueues() throws YarnException, IOException; public abstract List<QueueInfo> getAllQueues() throws YarnException, IOException;
/** /**
* <p> * <p>
@ -224,7 +260,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
List<QueueInfo> getRootQueueInfos() throws YarnException, IOException; public abstract List<QueueInfo> getRootQueueInfos() throws YarnException, IOException;
/** /**
* <p> * <p>
@ -239,7 +275,7 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
List<QueueInfo> getChildQueueInfos(String parent) throws YarnException, public abstract List<QueueInfo> getChildQueueInfos(String parent) throws YarnException,
IOException; IOException;
/** /**
@ -253,6 +289,6 @@ public interface YarnClient extends Service {
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException, public abstract List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
IOException; IOException;
} }

View File

@ -59,12 +59,11 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class YarnClientImpl extends AbstractService implements YarnClient { public class YarnClientImpl extends YarnClient {
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);

View File

@ -36,7 +36,7 @@ public abstract class YarnCLI extends Configured implements Tool {
public YarnCLI() { public YarnCLI() {
super(new YarnConfiguration()); super(new YarnConfiguration());
client = new YarnClientImpl(); client = YarnClient.createYarnClient();
client.init(getConf()); client.init(getConf());
client.start(); client.start();
} }

View File

@ -74,7 +74,7 @@ import org.mockito.stubbing.Answer;
public class TestAMRMClient { public class TestAMRMClient {
static Configuration conf = null; static Configuration conf = null;
static MiniYARNCluster yarnCluster = null; static MiniYARNCluster yarnCluster = null;
static YarnClientImpl yarnClient = null; static YarnClient yarnClient = null;
static List<NodeReport> nodeReports = null; static List<NodeReport> nodeReports = null;
static ApplicationAttemptId attemptId = null; static ApplicationAttemptId attemptId = null;
static int nodeCount = 3; static int nodeCount = 3;
@ -95,7 +95,7 @@ public class TestAMRMClient {
yarnCluster.start(); yarnCluster.start();
// start rm client // start rm client
yarnClient = new YarnClientImpl(); yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf); yarnClient.init(conf);
yarnClient.start(); yarnClient.start();
@ -169,10 +169,10 @@ public class TestAMRMClient {
@Test (timeout=60000) @Test (timeout=60000)
public void testAMRMClientMatchingFit() throws YarnException, IOException { public void testAMRMClientMatchingFit() throws YarnException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null; AMRMClient<StoredContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId); amClient = AMRMClient.<StoredContainerRequest>createAMRMClient(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -318,7 +318,9 @@ public class TestAMRMClient {
AMRMClientImpl<StoredContainerRequest> amClient = null; AMRMClientImpl<StoredContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId); amClient =
(AMRMClientImpl<StoredContainerRequest>) AMRMClient
.<StoredContainerRequest> createAMRMClient(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
@ -436,16 +438,16 @@ public class TestAMRMClient {
@Test (timeout=60000) @Test (timeout=60000)
public void testAMRMClient() throws YarnException, IOException { public void testAMRMClient() throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null; AMRMClient<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = new AMRMClientImpl<ContainerRequest>(attemptId); amClient = AMRMClient.<ContainerRequest>createAMRMClient(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
amClient.registerApplicationMaster("Host", 10000, ""); amClient.registerApplicationMaster("Host", 10000, "");
testAllocation(amClient); testAllocation((AMRMClientImpl<ContainerRequest>)amClient);
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null); null, null);

View File

@ -82,7 +82,7 @@ public class TestNMClient {
assertEquals(STATE.STARTED, yarnCluster.getServiceState()); assertEquals(STATE.STARTED, yarnCluster.getServiceState());
// start rm client // start rm client
yarnClient = new YarnClientImpl(); yarnClient = (YarnClientImpl) YarnClient.createYarnClient();
yarnClient.init(conf); yarnClient.init(conf);
yarnClient.start(); yarnClient.start();
assertNotNull(yarnClient); assertNotNull(yarnClient);
@ -136,14 +136,16 @@ public class TestNMClient {
} }
// start am rm client // start am rm client
rmClient = new AMRMClientImpl<ContainerRequest>(attemptId); rmClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient(attemptId);
rmClient.init(conf); rmClient.init(conf);
rmClient.start(); rmClient.start();
assertNotNull(rmClient); assertNotNull(rmClient);
assertEquals(STATE.STARTED, rmClient.getServiceState()); assertEquals(STATE.STARTED, rmClient.getServiceState());
// start am nm client // start am nm client
nmClient = new NMClientImpl(); nmClient = (NMClientImpl) NMClient.createNMClient();
nmClient.init(conf); nmClient.init(conf);
nmClient.start(); nmClient.start();
assertNotNull(nmClient); assertNotNull(nmClient);

View File

@ -62,7 +62,7 @@ public class TestYarnClient {
rm.init(conf); rm.init(conf);
rm.start(); rm.start();
YarnClient client = new YarnClientImpl(); YarnClient client = YarnClient.createYarnClient();
client.init(conf); client.init(conf);
client.start(); client.start();
client.stop(); client.stop();