YARN-78. Changed UnManagedAM application to use YarnClient. Contributed by Bikas Saha.

svn merge --ignore-ancestry -c 1383705 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1383706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-09-12 00:50:25 +00:00
parent 8644881db7
commit ddf8691f03
3 changed files with 85 additions and 106 deletions

View File

@ -8,6 +8,9 @@ Release 2.0.3-alpha - Unreleased
IMPROVEMENTS IMPROVEMENTS
YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via
vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -56,6 +56,10 @@
<artifactId>hadoop-yarn-server-common</artifactId> <artifactId>hadoop-yarn-server-common</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId> <artifactId>hadoop-mapreduce-client-core</artifactId>

View File

@ -22,7 +22,6 @@ import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Map; import java.util.Map;
@ -37,12 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -51,9 +45,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -73,11 +67,8 @@ public class UnmanagedAMLauncher {
private Configuration conf; private Configuration conf;
// RPC to communicate to RM
private YarnRPC rpc;
// Handle to talk to the Resource Manager/Applications Manager // Handle to talk to the Resource Manager/Applications Manager
private ClientRMProtocol rmClient; private YarnClientImpl rmClient;
// Application master specific info to register a new Application with RM/ASM // Application master specific info to register a new Application with RM/ASM
private String appName = ""; private String appName = "";
@ -114,7 +105,6 @@ public class UnmanagedAMLauncher {
public UnmanagedAMLauncher(Configuration conf) throws Exception { public UnmanagedAMLauncher(Configuration conf) throws Exception {
// Set up RPC // Set up RPC
this.conf = conf; this.conf = conf;
rpc = YarnRPC.create(conf);
} }
public UnmanagedAMLauncher() throws Exception { public UnmanagedAMLauncher() throws Exception {
@ -163,25 +153,11 @@ public class UnmanagedAMLauncher {
"No cmd specified for application master"); "No cmd specified for application master");
} }
return true;
}
private void connectToRM() throws IOException {
YarnConfiguration yarnConf = new YarnConfiguration(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = yarnConf.getSocketAddr( rmClient = new YarnClientImpl();
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, rmClient.init(yarnConf);
YarnConfiguration.DEFAULT_RM_PORT);
LOG.info("Connecting to ResourceManager at " + rmAddress);
rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, conf));
}
private GetNewApplicationResponse getApplication() throws YarnRemoteException { return true;
GetNewApplicationRequest request = Records
.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = rmClient.getNewApplication(request);
LOG.info("Got new application id=" + response.getApplicationId());
return response;
} }
public void launchAM(ApplicationAttemptId attemptId) throws IOException { public void launchAM(ApplicationAttemptId attemptId) throws IOException {
@ -275,80 +251,81 @@ public class UnmanagedAMLauncher {
} }
amProc.destroy(); amProc.destroy();
} }
public boolean run() throws IOException { public boolean run() throws IOException {
LOG.info("Starting Client"); LOG.info("Starting Client");
// Connect to ResourceManager // Connect to ResourceManager
connectToRM(); rmClient.start();
assert (rmClient != null); try {
// Get a new application id
// Get a new application id GetNewApplicationResponse newApp = rmClient.getNewApplication();
GetNewApplicationResponse newApp = getApplication(); ApplicationId appId = newApp.getApplicationId();
ApplicationId appId = newApp.getApplicationId();
// Create launch context for app master
// Create launch context for app master LOG.info("Setting up application submission context for ASM");
LOG.info("Setting up application submission context for ASM"); ApplicationSubmissionContext appContext = Records
ApplicationSubmissionContext appContext = Records .newRecord(ApplicationSubmissionContext.class);
.newRecord(ApplicationSubmissionContext.class);
// set the application id
// set the application id appContext.setApplicationId(appId);
appContext.setApplicationId(appId); // set the application name
// set the application name appContext.setApplicationName(appName);
appContext.setApplicationName(appName);
// Set the priority for the application master
// Set the priority for the application master Priority pri = Records.newRecord(Priority.class);
Priority pri = Records.newRecord(Priority.class); pri.setPriority(amPriority);
pri.setPriority(amPriority); appContext.setPriority(pri);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
// Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue);
appContext.setQueue(amQueue);
// Set up the container launch context for the application master
// Set up the container launch context for the application master ContainerLaunchContext amContainer = Records
ContainerLaunchContext amContainer = Records .newRecord(ContainerLaunchContext.class);
.newRecord(ContainerLaunchContext.class); appContext.setAMContainerSpec(amContainer);
appContext.setAMContainerSpec(amContainer);
// unmanaged AM
// unmanaged AM appContext.setUnmanagedAM(true);
appContext.setUnmanagedAM(true); LOG.info("Setting unmanaged AM");
LOG.info("Setting unmanaged AM");
// Submit the application to the applications manager
// Create the request to send to the applications manager LOG.info("Submitting application to ASM");
SubmitApplicationRequest appRequest = Records rmClient.submitApplication(appContext);
.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext); // Monitor the application to wait for launch state
ApplicationReport appReport = monitorApplication(appId,
// Submit the application to the applications manager EnumSet.of(YarnApplicationState.ACCEPTED));
LOG.info("Submitting application to ASM"); ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
rmClient.submitApplication(appRequest); LOG.info("Launching application with id: " + attemptId);
// Monitor the application to wait for launch state // launch AM
ApplicationReport appReport = monitorApplication(appId, launchAM(attemptId);
EnumSet.of(YarnApplicationState.ACCEPTED));
ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId(); // Monitor the application for end state
LOG.info("Launching application with id: " + attemptId); appReport = monitorApplication(appId, EnumSet.of(
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
// launch AM YarnApplicationState.FINISHED));
launchAM(attemptId); YarnApplicationState appState = appReport.getYarnApplicationState();
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
// Monitor the application for end state
appReport = monitorApplication(appId, EnumSet.of( LOG.info("App ended with state: " + appReport.getYarnApplicationState()
YarnApplicationState.KILLED, YarnApplicationState.FAILED, + " and status: " + appStatus);
YarnApplicationState.FINISHED));
YarnApplicationState appState = appReport.getYarnApplicationState(); boolean success;
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus(); if (YarnApplicationState.FINISHED == appState
&& FinalApplicationStatus.SUCCEEDED == appStatus) {
LOG.info("App ended with state: " + appReport.getYarnApplicationState() LOG.info("Application has completed successfully.");
+ " and status: " + appStatus); success = true;
if (YarnApplicationState.FINISHED == appState } else {
&& FinalApplicationStatus.SUCCEEDED == appStatus) { LOG.info("Application did finished unsuccessfully." + " YarnState="
LOG.info("Application has completed successfully."); + appState.toString() + ", FinalStatus=" + appStatus.toString());
return true; success = false;
} else { }
LOG.info("Application did finished unsuccessfully." + " YarnState="
+ appState.toString() + ", FinalStatus=" + appStatus.toString()); return success;
return false; } finally {
rmClient.stop();
} }
} }
@ -374,12 +351,7 @@ public class UnmanagedAMLauncher {
} }
// Get application report for the appId we are interested in // Get application report for the appId we are interested in
GetApplicationReportRequest reportRequest = Records ApplicationReport report = rmClient.getApplicationReport(appId);
.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse = rmClient
.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
LOG.info("Got application report from ASM for" + ", appId=" LOG.info("Got application report from ASM for" + ", appId="
+ appId.getId() + ", appAttemptId=" + appId.getId() + ", appAttemptId="