merge YARN-29 from trunk. Add a yarn-client module. (Contributed by Vinod Kumar Vavilapalli)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1377782 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0e5d96e3ee
commit
00874120d2
|
@ -121,6 +121,12 @@
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-client</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||||
|
|
|
@ -35,6 +35,8 @@ Release 2.1.0-alpha - Unreleased
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
YARN-29. Add a yarn-client module. (Vinod Kumar Vavilapalli via sseth)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
|
YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
|
||||||
|
|
|
@ -41,6 +41,10 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-common</artifactId>
|
<artifactId>hadoop-yarn-common</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-client</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
|
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -44,20 +43,8 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
@ -73,12 +60,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.hadoop.yarn.client.YarnClientImpl;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Client for Distributed Shell application submission to YARN.
|
* Client for Distributed Shell application submission to YARN.
|
||||||
|
@ -113,19 +100,13 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class Client {
|
public class Client extends YarnClientImpl {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(Client.class);
|
private static final Log LOG = LogFactory.getLog(Client.class);
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
// RPC to communicate to RM
|
|
||||||
private YarnRPC rpc;
|
|
||||||
|
|
||||||
// Handle to talk to the Resource Manager/Applications Manager
|
|
||||||
private ClientRMProtocol applicationsManager;
|
|
||||||
|
|
||||||
// Application master specific info to register a new Application with RM/ASM
|
// Application master specific info to register a new Application with RM/ASM
|
||||||
private String appName = "";
|
private String appName = "";
|
||||||
// App master priority
|
// App master priority
|
||||||
|
@ -196,9 +177,9 @@ public class Client {
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public Client(Configuration conf) throws Exception {
|
public Client(Configuration conf) throws Exception {
|
||||||
// Set up the configuration and RPC
|
super();
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
rpc = YarnRPC.create(conf);
|
init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -328,22 +309,17 @@ public class Client {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean run() throws IOException {
|
public boolean run() throws IOException {
|
||||||
LOG.info("Starting Client");
|
|
||||||
|
|
||||||
// Connect to ResourceManager
|
LOG.info("Running Client");
|
||||||
connectToASM();
|
start();
|
||||||
assert(applicationsManager != null);
|
|
||||||
|
|
||||||
// Use ClientRMProtocol handle to general cluster information
|
YarnClusterMetrics clusterMetrics = super.getYarnClusterMetrics();
|
||||||
GetClusterMetricsRequest clusterMetricsReq = Records.newRecord(GetClusterMetricsRequest.class);
|
|
||||||
GetClusterMetricsResponse clusterMetricsResp = applicationsManager.getClusterMetrics(clusterMetricsReq);
|
|
||||||
LOG.info("Got Cluster metric info from ASM"
|
LOG.info("Got Cluster metric info from ASM"
|
||||||
+ ", numNodeManagers=" + clusterMetricsResp.getClusterMetrics().getNumNodeManagers());
|
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
|
||||||
|
|
||||||
GetClusterNodesRequest clusterNodesReq = Records.newRecord(GetClusterNodesRequest.class);
|
List<NodeReport> clusterNodeReports = super.getNodeReports();
|
||||||
GetClusterNodesResponse clusterNodesResp = applicationsManager.getClusterNodes(clusterNodesReq);
|
|
||||||
LOG.info("Got Cluster node info from ASM");
|
LOG.info("Got Cluster node info from ASM");
|
||||||
for (NodeReport node : clusterNodesResp.getNodeReports()) {
|
for (NodeReport node : clusterNodeReports) {
|
||||||
LOG.info("Got node report from ASM for"
|
LOG.info("Got node report from ASM for"
|
||||||
+ ", nodeId=" + node.getNodeId()
|
+ ", nodeId=" + node.getNodeId()
|
||||||
+ ", nodeAddress" + node.getHttpAddress()
|
+ ", nodeAddress" + node.getHttpAddress()
|
||||||
|
@ -352,10 +328,7 @@ public class Client {
|
||||||
+ ", nodeHealthStatus" + node.getNodeHealthStatus());
|
+ ", nodeHealthStatus" + node.getNodeHealthStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
GetQueueInfoRequest queueInfoReq = Records.newRecord(GetQueueInfoRequest.class);
|
QueueInfo queueInfo = super.getQueueInfo(this.amQueue);
|
||||||
queueInfoReq.setQueueName(this.amQueue);
|
|
||||||
GetQueueInfoResponse queueInfoResp = applicationsManager.getQueueInfo(queueInfoReq);
|
|
||||||
QueueInfo queueInfo = queueInfoResp.getQueueInfo();
|
|
||||||
LOG.info("Queue info"
|
LOG.info("Queue info"
|
||||||
+ ", queueName=" + queueInfo.getQueueName()
|
+ ", queueName=" + queueInfo.getQueueName()
|
||||||
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
|
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
|
||||||
|
@ -363,9 +336,7 @@ public class Client {
|
||||||
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
|
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
|
||||||
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
|
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
|
||||||
|
|
||||||
GetQueueUserAclsInfoRequest queueUserAclsReq = Records.newRecord(GetQueueUserAclsInfoRequest.class);
|
List<QueueUserACLInfo> listAclInfo = super.getQueueAclsInfo();
|
||||||
GetQueueUserAclsInfoResponse queueUserAclsResp = applicationsManager.getQueueUserAcls(queueUserAclsReq);
|
|
||||||
List<QueueUserACLInfo> listAclInfo = queueUserAclsResp.getUserAclsInfoList();
|
|
||||||
for (QueueUserACLInfo aclInfo : listAclInfo) {
|
for (QueueUserACLInfo aclInfo : listAclInfo) {
|
||||||
for (QueueACL userAcl : aclInfo.getUserAcls()) {
|
for (QueueACL userAcl : aclInfo.getUserAcls()) {
|
||||||
LOG.info("User ACL Info for Queue"
|
LOG.info("User ACL Info for Queue"
|
||||||
|
@ -375,7 +346,7 @@ public class Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a new application id
|
// Get a new application id
|
||||||
GetNewApplicationResponse newApp = getApplication();
|
GetNewApplicationResponse newApp = super.getNewApplication();
|
||||||
ApplicationId appId = newApp.getApplicationId();
|
ApplicationId appId = newApp.getApplicationId();
|
||||||
|
|
||||||
// TODO get min/max resource capabilities from RM and change memory ask if needed
|
// TODO get min/max resource capabilities from RM and change memory ask if needed
|
||||||
|
@ -590,16 +561,12 @@ public class Client {
|
||||||
// Set the queue to which this application is to be submitted in the RM
|
// Set the queue to which this application is to be submitted in the RM
|
||||||
appContext.setQueue(amQueue);
|
appContext.setQueue(amQueue);
|
||||||
|
|
||||||
// Create the request to send to the applications manager
|
|
||||||
SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
|
|
||||||
appRequest.setApplicationSubmissionContext(appContext);
|
|
||||||
|
|
||||||
// Submit the application to the applications manager
|
// Submit the application to the applications manager
|
||||||
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
|
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
|
||||||
// Ignore the response as either a valid response object is returned on success
|
// Ignore the response as either a valid response object is returned on success
|
||||||
// or an exception thrown to denote some form of a failure
|
// or an exception thrown to denote some form of a failure
|
||||||
LOG.info("Submitting application to ASM");
|
LOG.info("Submitting application to ASM");
|
||||||
applicationsManager.submitApplication(appRequest);
|
super.submitApplication(appContext);
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
// Try submitting the same request again
|
// Try submitting the same request again
|
||||||
|
@ -629,10 +596,7 @@ public class Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get application report for the appId we are interested in
|
// Get application report for the appId we are interested in
|
||||||
GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
|
ApplicationReport report = super.getApplicationReport(appId);
|
||||||
reportRequest.setApplicationId(appId);
|
|
||||||
GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
|
|
||||||
ApplicationReport report = reportResponse.getApplicationReport();
|
|
||||||
|
|
||||||
LOG.info("Got application report from ASM for"
|
LOG.info("Got application report from ASM for"
|
||||||
+ ", appId=" + appId.getId()
|
+ ", appId=" + appId.getId()
|
||||||
|
@ -671,7 +635,7 @@ public class Client {
|
||||||
|
|
||||||
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
|
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
|
||||||
LOG.info("Reached client specified timeout for application. Killing application");
|
LOG.info("Reached client specified timeout for application. Killing application");
|
||||||
killApplication(appId);
|
forceKillApplication(appId);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -683,61 +647,14 @@ public class Client {
|
||||||
* @param appId Application Id to be killed.
|
* @param appId Application Id to be killed.
|
||||||
* @throws YarnRemoteException
|
* @throws YarnRemoteException
|
||||||
*/
|
*/
|
||||||
private void killApplication(ApplicationId appId) throws YarnRemoteException {
|
private void forceKillApplication(ApplicationId appId) throws YarnRemoteException {
|
||||||
KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class);
|
|
||||||
// TODO clarify whether multiple jobs with the same app id can be submitted and be running at
|
// TODO clarify whether multiple jobs with the same app id can be submitted and be running at
|
||||||
// the same time.
|
// the same time.
|
||||||
// If yes, can we kill a particular attempt only?
|
// If yes, can we kill a particular attempt only?
|
||||||
request.setApplicationId(appId);
|
|
||||||
// KillApplicationResponse response = applicationsManager.forceKillApplication(request);
|
|
||||||
// Response can be ignored as it is non-null on success or
|
// Response can be ignored as it is non-null on success or
|
||||||
// throws an exception in case of failures
|
// throws an exception in case of failures
|
||||||
applicationsManager.forceKillApplication(request);
|
super.killApplication(appId);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connect to the Resource Manager/Applications Manager
|
|
||||||
* @return Handle to communicate with the ASM
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void connectToASM() throws IOException {
|
|
||||||
|
|
||||||
/*
|
|
||||||
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
|
||||||
applicationsManager = user.doAs(new PrivilegedAction<ClientRMProtocol>() {
|
|
||||||
public ClientRMProtocol run() {
|
|
||||||
InetSocketAddress rmAddress = NetUtils.createSocketAddr(conf.get(
|
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
|
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
||||||
Configuration appsManagerServerConf = new Configuration(conf);
|
|
||||||
appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
|
|
||||||
ClientRMSecurityInfo.class, SecurityInfo.class);
|
|
||||||
ClientRMProtocol asm = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf));
|
|
||||||
return asm;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
*/
|
|
||||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
|
||||||
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
|
||||||
YarnConfiguration.RM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_PORT);
|
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
||||||
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
|
|
||||||
ClientRMProtocol.class, rmAddress, conf));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a new application from the ASM
|
|
||||||
* @return New Application
|
|
||||||
* @throws YarnRemoteException
|
|
||||||
*/
|
|
||||||
private GetNewApplicationResponse getApplication() throws YarnRemoteException {
|
|
||||||
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
|
|
||||||
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
|
|
||||||
LOG.info("Got new application id=" + response.getApplicationId());
|
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getTestRuntimeClasspath() {
|
private static String getTestRuntimeClasspath() {
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<FindBugsFilter>
|
||||||
|
</FindBugsFilter>
|
|
@ -0,0 +1,37 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
<project xmlns:pom="http://maven.apache.org/POM/4.0.0">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<artifactId>hadoop-yarn</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<version>2.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-client</artifactId>
|
||||||
|
<version>2.0.1-SNAPSHOT</version>
|
||||||
|
<name>hadoop-yarn-client</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-common</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,234 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface YarnClient extends Service {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Obtain a new {@link ApplicationId} for submitting new applications.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Returns a response which contains {@link ApplicationId} that can be used to
|
||||||
|
* submit a new application. See
|
||||||
|
* {@link #submitApplication(ApplicationSubmissionContext)}.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* See {@link GetNewApplicationResponse} for other information that is
|
||||||
|
* returned.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return response containing the new <code>ApplicationId</code> to be used
|
||||||
|
* to submit an application
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
GetNewApplicationResponse getNewApplication() throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Submit a new application to <code>YARN.</code>
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param appContext
|
||||||
|
* {@link ApplicationSubmissionContext} containing all the details
|
||||||
|
* needed to submit a new application
|
||||||
|
* @return {@link ApplicationId} of the accepted application
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
* @see #getNewApplication()
|
||||||
|
*/
|
||||||
|
ApplicationId submitApplication(ApplicationSubmissionContext appContext)
|
||||||
|
throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Kill an application identified by given ID.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param applicationId
|
||||||
|
* {@link ApplicationId} of the application that needs to be killed
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
* in case of errors or if YARN rejects the request due to
|
||||||
|
* access-control restrictions.
|
||||||
|
* @see #getQueueAclsInfo()
|
||||||
|
*/
|
||||||
|
void killApplication(ApplicationId applicationId) throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get a report of the given Application.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* In secure mode, <code>YARN</code> verifies access to the application, queue
|
||||||
|
* etc. before accepting the request.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* If the user does not have <code>VIEW_APP</code> access then the following
|
||||||
|
* fields in the report will be set to stubbed values:
|
||||||
|
* <ul>
|
||||||
|
* <li>host - set to "N/A"</li>
|
||||||
|
* <li>RPC port - set to -1</li>
|
||||||
|
* <li>client token - set to "N/A"</li>
|
||||||
|
* <li>diagnostics - set to "N/A"</li>
|
||||||
|
* <li>tracking URL - set to "N/A"</li>
|
||||||
|
* <li>original tracking URL - set to "N/A"</li>
|
||||||
|
* <li>resource usage report - all values are -1</li>
|
||||||
|
* </ul>
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param appId
|
||||||
|
* {@link ApplicationId} of the application that needs a report
|
||||||
|
* @return application report
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
ApplicationReport getApplicationReport(ApplicationId appId)
|
||||||
|
throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get a report (ApplicationReport) of all Applications in the cluster.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* If the user does not have <code>VIEW_APP</code> access for an application
|
||||||
|
* then the corresponding report will be filtered as described in
|
||||||
|
* {@link #getApplicationReport(ApplicationId)}.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return a list of reports of all running applications
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
List<ApplicationReport> getApplicationList() throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get metrics ({@link YarnClusterMetrics}) about the cluster.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return cluster metrics
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get a report of all nodes ({@link NodeReport}) in the cluster.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return A list of report of all nodes
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
List<NodeReport> getNodeReports() throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get a delegation token so as to be able to talk to YARN using those tokens.
|
||||||
|
*
|
||||||
|
* @param renewer
|
||||||
|
* Address of the renewer who can renew these tokens when needed by
|
||||||
|
* securely talking to YARN.
|
||||||
|
* @return a delegation token ({@link DelegationToken}) that can be used to
|
||||||
|
* talk to YARN
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
DelegationToken getRMDelegationToken(Text renewer) throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get information ({@link QueueInfo}) about a given <em>queue</em>.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param queueName
|
||||||
|
* Name of the queue whose information is needed
|
||||||
|
* @return queue information
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
* in case of errors or if YARN rejects the request due to
|
||||||
|
* access-control restrictions.
|
||||||
|
*/
|
||||||
|
QueueInfo getQueueInfo(String queueName) throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get information ({@link QueueInfo}) about all queues, recursively if there
|
||||||
|
* is a hierarchy
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return a list of queue-information for all queues
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
List<QueueInfo> getAllQueues() throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get information ({@link QueueInfo}) about top level queues.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return a list of queue-information for all the top-level queues
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
List<QueueInfo> getRootQueueInfos() throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get information ({@link QueueInfo}) about all the immediate children queues
|
||||||
|
* of the given queue
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param parent
|
||||||
|
* Name of the queue whose child-queues' information is needed
|
||||||
|
* @return a list of queue-information for all queues who are direct children
|
||||||
|
* of the given parent queue.
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
List<QueueInfo> getChildQueueInfos(String parent) throws YarnRemoteException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Get information about <em>acls</em> for <em>current user</em> on all the
|
||||||
|
* existing queues.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return a list of queue acls ({@link QueueUserACLInfo}) for
|
||||||
|
* <em>current user</em>
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException;
|
||||||
|
}
|
|
@ -0,0 +1,258 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class YarnClientImpl extends AbstractService implements YarnClient {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
|
||||||
|
|
||||||
|
protected ClientRMProtocol rmClient;
|
||||||
|
protected InetSocketAddress rmAddress;
|
||||||
|
|
||||||
|
private static final String ROOT = "root";
|
||||||
|
|
||||||
|
public YarnClientImpl() {
|
||||||
|
super(YarnClientImpl.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static InetSocketAddress getRmAddress(Configuration conf) {
|
||||||
|
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void init(Configuration conf) {
|
||||||
|
this.rmAddress = getRmAddress(conf);
|
||||||
|
super.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void start() {
|
||||||
|
YarnRPC rpc = YarnRPC.create(getConfig());
|
||||||
|
|
||||||
|
this.rmClient =
|
||||||
|
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress,
|
||||||
|
getConfig());
|
||||||
|
LOG.debug("Connecting to ResourceManager at " + rmAddress);
|
||||||
|
super.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void stop() {
|
||||||
|
RPC.stopProxy(this.rmClient);
|
||||||
|
super.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetNewApplicationResponse getNewApplication()
|
||||||
|
throws YarnRemoteException {
|
||||||
|
GetNewApplicationRequest request =
|
||||||
|
Records.newRecord(GetNewApplicationRequest.class);
|
||||||
|
return rmClient.getNewApplication(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationId
|
||||||
|
submitApplication(ApplicationSubmissionContext appContext)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
ApplicationId applicationId = appContext.getApplicationId();
|
||||||
|
appContext.setApplicationId(applicationId);
|
||||||
|
SubmitApplicationRequest request =
|
||||||
|
Records.newRecord(SubmitApplicationRequest.class);
|
||||||
|
request.setApplicationSubmissionContext(appContext);
|
||||||
|
rmClient.submitApplication(request);
|
||||||
|
LOG.info("Submitted application " + applicationId + " to ResourceManager"
|
||||||
|
+ " at " + rmAddress);
|
||||||
|
return applicationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void killApplication(ApplicationId applicationId)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
LOG.info("Killing application " + applicationId);
|
||||||
|
KillApplicationRequest request =
|
||||||
|
Records.newRecord(KillApplicationRequest.class);
|
||||||
|
request.setApplicationId(applicationId);
|
||||||
|
rmClient.forceKillApplication(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationReport getApplicationReport(ApplicationId appId)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
GetApplicationReportRequest request =
|
||||||
|
Records.newRecord(GetApplicationReportRequest.class);
|
||||||
|
request.setApplicationId(appId);
|
||||||
|
GetApplicationReportResponse response =
|
||||||
|
rmClient.getApplicationReport(request);
|
||||||
|
return response.getApplicationReport();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ApplicationReport> getApplicationList()
|
||||||
|
throws YarnRemoteException {
|
||||||
|
GetAllApplicationsRequest request =
|
||||||
|
Records.newRecord(GetAllApplicationsRequest.class);
|
||||||
|
GetAllApplicationsResponse response = rmClient.getAllApplications(request);
|
||||||
|
return response.getApplicationList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public YarnClusterMetrics getYarnClusterMetrics() throws YarnRemoteException {
|
||||||
|
GetClusterMetricsRequest request =
|
||||||
|
Records.newRecord(GetClusterMetricsRequest.class);
|
||||||
|
GetClusterMetricsResponse response = rmClient.getClusterMetrics(request);
|
||||||
|
return response.getClusterMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<NodeReport> getNodeReports() throws YarnRemoteException {
|
||||||
|
GetClusterNodesRequest request =
|
||||||
|
Records.newRecord(GetClusterNodesRequest.class);
|
||||||
|
GetClusterNodesResponse response = rmClient.getClusterNodes(request);
|
||||||
|
return response.getNodeReports();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DelegationToken getRMDelegationToken(Text renewer)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
/* get the token from RM */
|
||||||
|
GetDelegationTokenRequest rmDTRequest =
|
||||||
|
Records.newRecord(GetDelegationTokenRequest.class);
|
||||||
|
rmDTRequest.setRenewer(renewer.toString());
|
||||||
|
GetDelegationTokenResponse response =
|
||||||
|
rmClient.getDelegationToken(rmDTRequest);
|
||||||
|
return response.getRMDelegationToken();
|
||||||
|
}
|
||||||
|
|
||||||
|
private GetQueueInfoRequest
|
||||||
|
getQueueInfoRequest(String queueName, boolean includeApplications,
|
||||||
|
boolean includeChildQueues, boolean recursive) {
|
||||||
|
GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
|
||||||
|
request.setQueueName(queueName);
|
||||||
|
request.setIncludeApplications(includeApplications);
|
||||||
|
request.setIncludeChildQueues(includeChildQueues);
|
||||||
|
request.setRecursive(recursive);
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueInfo getQueueInfo(String queueName) throws YarnRemoteException {
|
||||||
|
GetQueueInfoRequest request =
|
||||||
|
getQueueInfoRequest(queueName, true, false, false);
|
||||||
|
Records.newRecord(GetQueueInfoRequest.class);
|
||||||
|
return rmClient.getQueueInfo(request).getQueueInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnRemoteException {
|
||||||
|
GetQueueUserAclsInfoRequest request =
|
||||||
|
Records.newRecord(GetQueueUserAclsInfoRequest.class);
|
||||||
|
return rmClient.getQueueUserAcls(request).getUserAclsInfoList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<QueueInfo> getAllQueues() throws YarnRemoteException {
|
||||||
|
List<QueueInfo> queues = new ArrayList<QueueInfo>();
|
||||||
|
|
||||||
|
QueueInfo rootQueue =
|
||||||
|
rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
|
||||||
|
.getQueueInfo();
|
||||||
|
getChildQueues(rootQueue, queues, true);
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<QueueInfo> getRootQueueInfos() throws YarnRemoteException {
|
||||||
|
List<QueueInfo> queues = new ArrayList<QueueInfo>();
|
||||||
|
|
||||||
|
QueueInfo rootQueue =
|
||||||
|
rmClient.getQueueInfo(getQueueInfoRequest(ROOT, false, true, true))
|
||||||
|
.getQueueInfo();
|
||||||
|
getChildQueues(rootQueue, queues, false);
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<QueueInfo> getChildQueueInfos(String parent)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
List<QueueInfo> queues = new ArrayList<QueueInfo>();
|
||||||
|
|
||||||
|
QueueInfo parentQueue =
|
||||||
|
rmClient.getQueueInfo(getQueueInfoRequest(parent, false, true, false))
|
||||||
|
.getQueueInfo();
|
||||||
|
getChildQueues(parentQueue, queues, true);
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getChildQueues(QueueInfo parent, List<QueueInfo> queues,
|
||||||
|
boolean recursive) {
|
||||||
|
List<QueueInfo> childQueues = parent.getChildQueues();
|
||||||
|
|
||||||
|
for (QueueInfo child : childQueues) {
|
||||||
|
queues.add(child);
|
||||||
|
if (recursive) {
|
||||||
|
getChildQueues(child, queues, recursive);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestYarnClient {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() {
|
||||||
|
// More to come later.
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -11,8 +11,7 @@
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
See the License for the specific language governing permissions and
|
See the License for the specific language governing permissions and
|
||||||
limitations under the License. See accompanying LICENSE file.
|
limitations under the License. See accompanying LICENSE file.
|
||||||
-->
|
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<parent>
|
<parent>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
@ -186,5 +185,6 @@
|
||||||
<module>hadoop-yarn-server</module>
|
<module>hadoop-yarn-server</module>
|
||||||
<module>hadoop-yarn-applications</module>
|
<module>hadoop-yarn-applications</module>
|
||||||
<module>hadoop-yarn-site</module>
|
<module>hadoop-yarn-site</module>
|
||||||
|
<module>hadoop-yarn-client</module>
|
||||||
</modules>
|
</modules>
|
||||||
</project>
|
</project>
|
Loading…
Reference in New Issue