diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a1a200d7099..9f2a70c2e89 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -25,6 +25,9 @@ Release 2.0.3-alpha - Unreleased
IMPROVEMENTS
+ YARN-78. Changed UnManagedAM application to use YarnClient. (Bikas Saha via
+ vinodkv)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
index e9550606e55..eee1667c57a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml
@@ -56,6 +56,10 @@
hadoop-yarn-server-common
test
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
org.apache.hadoop
hadoop-mapreduce-client-core
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
index cd5f94e544e..7950bd155c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
@@ -22,7 +22,6 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Map;
@@ -37,12 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -51,9 +45,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -73,11 +67,8 @@ public class UnmanagedAMLauncher {
private Configuration conf;
- // RPC to communicate to RM
- private YarnRPC rpc;
-
// Handle to talk to the Resource Manager/Applications Manager
- private ClientRMProtocol rmClient;
+ private YarnClientImpl rmClient;
// Application master specific info to register a new Application with RM/ASM
private String appName = "";
@@ -114,7 +105,6 @@ public class UnmanagedAMLauncher {
public UnmanagedAMLauncher(Configuration conf) throws Exception {
// Set up RPC
this.conf = conf;
- rpc = YarnRPC.create(conf);
}
public UnmanagedAMLauncher() throws Exception {
@@ -163,25 +153,11 @@ public class UnmanagedAMLauncher {
"No cmd specified for application master");
}
- return true;
- }
-
- private void connectToRM() throws IOException {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress = yarnConf.getSocketAddr(
- YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
- LOG.info("Connecting to ResourceManager at " + rmAddress);
- rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
- rmAddress, conf));
- }
+ rmClient = new YarnClientImpl();
+ rmClient.init(yarnConf);
- private GetNewApplicationResponse getApplication() throws YarnRemoteException {
- GetNewApplicationRequest request = Records
- .newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse response = rmClient.getNewApplication(request);
- LOG.info("Got new application id=" + response.getApplicationId());
- return response;
+ return true;
}
public void launchAM(ApplicationAttemptId attemptId) throws IOException {
@@ -275,80 +251,81 @@ public class UnmanagedAMLauncher {
}
amProc.destroy();
}
-
+
public boolean run() throws IOException {
LOG.info("Starting Client");
-
+
// Connect to ResourceManager
- connectToRM();
- assert (rmClient != null);
-
- // Get a new application id
- GetNewApplicationResponse newApp = getApplication();
- ApplicationId appId = newApp.getApplicationId();
-
- // Create launch context for app master
- LOG.info("Setting up application submission context for ASM");
- ApplicationSubmissionContext appContext = Records
- .newRecord(ApplicationSubmissionContext.class);
-
- // set the application id
- appContext.setApplicationId(appId);
- // set the application name
- appContext.setApplicationName(appName);
-
- // Set the priority for the application master
- Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(amPriority);
- appContext.setPriority(pri);
-
- // Set the queue to which this application is to be submitted in the RM
- appContext.setQueue(amQueue);
-
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
- appContext.setAMContainerSpec(amContainer);
-
- // unmanaged AM
- appContext.setUnmanagedAM(true);
- LOG.info("Setting unmanaged AM");
-
- // Create the request to send to the applications manager
- SubmitApplicationRequest appRequest = Records
- .newRecord(SubmitApplicationRequest.class);
- appRequest.setApplicationSubmissionContext(appContext);
-
- // Submit the application to the applications manager
- LOG.info("Submitting application to ASM");
- rmClient.submitApplication(appRequest);
-
- // Monitor the application to wait for launch state
- ApplicationReport appReport = monitorApplication(appId,
- EnumSet.of(YarnApplicationState.ACCEPTED));
- ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
- LOG.info("Launching application with id: " + attemptId);
-
- // launch AM
- launchAM(attemptId);
-
- // Monitor the application for end state
- appReport = monitorApplication(appId, EnumSet.of(
- YarnApplicationState.KILLED, YarnApplicationState.FAILED,
- YarnApplicationState.FINISHED));
- YarnApplicationState appState = appReport.getYarnApplicationState();
- FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
-
- LOG.info("App ended with state: " + appReport.getYarnApplicationState()
- + " and status: " + appStatus);
- if (YarnApplicationState.FINISHED == appState
- && FinalApplicationStatus.SUCCEEDED == appStatus) {
- LOG.info("Application has completed successfully.");
- return true;
- } else {
- LOG.info("Application did finished unsuccessfully." + " YarnState="
- + appState.toString() + ", FinalStatus=" + appStatus.toString());
- return false;
+ rmClient.start();
+ try {
+ // Get a new application id
+ GetNewApplicationResponse newApp = rmClient.getNewApplication();
+ ApplicationId appId = newApp.getApplicationId();
+
+ // Create launch context for app master
+ LOG.info("Setting up application submission context for ASM");
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+ // set the application name
+ appContext.setApplicationName(appName);
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(amPriority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(amQueue);
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+ appContext.setAMContainerSpec(amContainer);
+
+ // unmanaged AM
+ appContext.setUnmanagedAM(true);
+ LOG.info("Setting unmanaged AM");
+
+ // Submit the application to the applications manager
+ LOG.info("Submitting application to ASM");
+ rmClient.submitApplication(appContext);
+
+ // Monitor the application to wait for launch state
+ ApplicationReport appReport = monitorApplication(appId,
+ EnumSet.of(YarnApplicationState.ACCEPTED));
+ ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+ LOG.info("Launching application with id: " + attemptId);
+
+ // launch AM
+ launchAM(attemptId);
+
+ // Monitor the application for end state
+ appReport = monitorApplication(appId, EnumSet.of(
+ YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+ YarnApplicationState.FINISHED));
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
+
+ LOG.info("App ended with state: " + appReport.getYarnApplicationState()
+ + " and status: " + appStatus);
+
+ boolean success;
+ if (YarnApplicationState.FINISHED == appState
+ && FinalApplicationStatus.SUCCEEDED == appStatus) {
+ LOG.info("Application has completed successfully.");
+ success = true;
+ } else {
+ LOG.info("Application did finished unsuccessfully." + " YarnState="
+ + appState.toString() + ", FinalStatus=" + appStatus.toString());
+ success = false;
+ }
+
+ return success;
+ } finally {
+ rmClient.stop();
}
}
@@ -374,12 +351,7 @@ public class UnmanagedAMLauncher {
}
// Get application report for the appId we are interested in
- GetApplicationReportRequest reportRequest = Records
- .newRecord(GetApplicationReportRequest.class);
- reportRequest.setApplicationId(appId);
- GetApplicationReportResponse reportResponse = rmClient
- .getApplicationReport(reportRequest);
- ApplicationReport report = reportResponse.getApplicationReport();
+ ApplicationReport report = rmClient.getApplicationReport(appId);
LOG.info("Got application report from ASM for" + ", appId="
+ appId.getId() + ", appAttemptId="