MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN. Contributed by Bikas Saha.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1365185 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d8f3913856
commit
3ef19e9dbd
|
@ -161,6 +161,9 @@ Release 2.1.0-alpha - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
|
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN.
|
||||||
|
(Bikas Saha via acmurthy)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
|
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
|
||||||
|
|
|
@ -290,7 +290,10 @@ public class ApplicationMaster {
|
||||||
Map<String, String> envs = System.getenv();
|
Map<String, String> envs = System.getenv();
|
||||||
|
|
||||||
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
|
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
|
||||||
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
|
||||||
|
appAttemptID = ConverterUtils.toApplicationAttemptId(envs
|
||||||
|
.get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
|
||||||
|
} else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
||||||
if (cliParser.hasOption("app_attempt_id")) {
|
if (cliParser.hasOption("app_attempt_id")) {
|
||||||
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
||||||
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||||
|
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>hadoop-yarn-applications</artifactId>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<version>3.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-applications-unmanaged-am-launcher</artifactId>
|
||||||
|
<version>3.0.0-SNAPSHOT</version>
|
||||||
|
<name>hadoop-yarn-applications-unmanaged-am-launcher</name>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||||
|
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-common</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>
|
||||||
|
<version>3.0.0-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-dependency-plugin</artifactId>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>build-classpath</id>
|
||||||
|
<phase>generate-sources</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>build-classpath</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<!-- needed to run the unit test for DS to generate the required classpath
|
||||||
|
that is required in the env of the launch container in the mini yarn cluster -->
|
||||||
|
<outputFile>target/classes/yarn-apps-am-generated-classpath</outputFile>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<environmentVariables>
|
||||||
|
<JAVA_HOME>${java.home}</JAVA_HOME>
|
||||||
|
</environmentVariables>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,405 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.cli.GnuParser;
|
||||||
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
|
import org.apache.commons.cli.Options;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The UnmanagedLauncher is a simple client that launches and unmanaged AM. An
|
||||||
|
* unmanagedAM is an AM that is not launched and managed by the RM. The client
|
||||||
|
* creates a new application on the RM and negotiates a new attempt id. Then it
|
||||||
|
* waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
|
||||||
|
* which it spawns the AM in another process and passes it the attempt id via
|
||||||
|
* env variable ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. The AM can be in any
|
||||||
|
* language. The AM can register with the RM using the attempt id and proceed as
|
||||||
|
* normal. The client redirects app stdout and stderr to its own stdout and
|
||||||
|
* stderr and waits for the AM process to exit. Then it waits for the RM to
|
||||||
|
* report app completion.
|
||||||
|
*/
|
||||||
|
public class UnmanagedAMLauncher {
|
||||||
|
private static final Log LOG = LogFactory.getLog(UnmanagedAMLauncher.class);
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
// RPC to communicate to RM
|
||||||
|
private YarnRPC rpc;
|
||||||
|
|
||||||
|
// Handle to talk to the Resource Manager/Applications Manager
|
||||||
|
private ClientRMProtocol rmClient;
|
||||||
|
|
||||||
|
// Application master specific info to register a new Application with RM/ASM
|
||||||
|
private String appName = "";
|
||||||
|
// App master priority
|
||||||
|
private int amPriority = 0;
|
||||||
|
// Queue for App master
|
||||||
|
private String amQueue = "";
|
||||||
|
// cmd to start AM
|
||||||
|
private String amCmd = null;
|
||||||
|
// set the classpath explicitly
|
||||||
|
private String classpath = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param args
|
||||||
|
* Command line arguments
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) {
|
||||||
|
try {
|
||||||
|
UnmanagedAMLauncher client = new UnmanagedAMLauncher();
|
||||||
|
LOG.info("Initializing Client");
|
||||||
|
boolean doRun = client.init(args);
|
||||||
|
if (!doRun) {
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
client.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.fatal("Error running Client", t);
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public UnmanagedAMLauncher(Configuration conf) throws Exception {
|
||||||
|
// Set up RPC
|
||||||
|
this.conf = conf;
|
||||||
|
rpc = YarnRPC.create(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public UnmanagedAMLauncher() throws Exception {
|
||||||
|
this(new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printUsage(Options opts) {
|
||||||
|
new HelpFormatter().printHelp("Client", opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean init(String[] args) throws ParseException {
|
||||||
|
|
||||||
|
Options opts = new Options();
|
||||||
|
opts.addOption("appname", true,
|
||||||
|
"Application Name. Default value - UnmanagedAM");
|
||||||
|
opts.addOption("priority", true, "Application Priority. Default 0");
|
||||||
|
opts.addOption("queue", true,
|
||||||
|
"RM Queue in which this application is to be submitted");
|
||||||
|
opts.addOption("master_memory", true,
|
||||||
|
"Amount of memory in MB to be requested to run the application master");
|
||||||
|
opts.addOption("cmd", true, "command to start unmanaged AM (required)");
|
||||||
|
opts.addOption("classpath", true, "additional classpath");
|
||||||
|
opts.addOption("help", false, "Print usage");
|
||||||
|
CommandLine cliParser = new GnuParser().parse(opts, args);
|
||||||
|
|
||||||
|
if (args.length == 0) {
|
||||||
|
printUsage(opts);
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"No args specified for client to initialize");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cliParser.hasOption("help")) {
|
||||||
|
printUsage(opts);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
appName = cliParser.getOptionValue("appname", "UnmanagedAM");
|
||||||
|
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
|
||||||
|
amQueue = cliParser.getOptionValue("queue", "");
|
||||||
|
classpath = cliParser.getOptionValue("classpath", null);
|
||||||
|
|
||||||
|
amCmd = cliParser.getOptionValue("cmd");
|
||||||
|
if (amCmd == null) {
|
||||||
|
printUsage(opts);
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"No cmd specified for application master");
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void connectToRM() throws IOException {
|
||||||
|
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||||
|
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
||||||
|
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_PORT);
|
||||||
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
|
rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||||
|
rmAddress, conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
private GetNewApplicationResponse getApplication() throws YarnRemoteException {
|
||||||
|
GetNewApplicationRequest request = Records
|
||||||
|
.newRecord(GetNewApplicationRequest.class);
|
||||||
|
GetNewApplicationResponse response = rmClient.getNewApplication(request);
|
||||||
|
LOG.info("Got new application id=" + response.getApplicationId());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void launchAM(ApplicationAttemptId attemptId) throws IOException {
|
||||||
|
Map<String, String> env = System.getenv();
|
||||||
|
ArrayList<String> envAMList = new ArrayList<String>();
|
||||||
|
boolean setClasspath = false;
|
||||||
|
for (Map.Entry<String, String> entry : env.entrySet()) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
String value = entry.getValue();
|
||||||
|
if(key.equals("CLASSPATH")) {
|
||||||
|
setClasspath = true;
|
||||||
|
if(classpath != null) {
|
||||||
|
value = value + File.pathSeparator + classpath;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
envAMList.add(key + "=" + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!setClasspath && classpath!=null) {
|
||||||
|
envAMList.add("CLASSPATH="+classpath);
|
||||||
|
}
|
||||||
|
|
||||||
|
envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
|
||||||
|
|
||||||
|
String[] envAM = new String[envAMList.size()];
|
||||||
|
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
|
||||||
|
|
||||||
|
final BufferedReader errReader =
|
||||||
|
new BufferedReader(new InputStreamReader(amProc
|
||||||
|
.getErrorStream()));
|
||||||
|
final BufferedReader inReader =
|
||||||
|
new BufferedReader(new InputStreamReader(amProc
|
||||||
|
.getInputStream()));
|
||||||
|
|
||||||
|
// read error and input streams as this would free up the buffers
|
||||||
|
// free the error stream buffer
|
||||||
|
Thread errThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
String line = errReader.readLine();
|
||||||
|
while((line != null) && !isInterrupted()) {
|
||||||
|
System.err.println(line);
|
||||||
|
line = errReader.readLine();
|
||||||
|
}
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
LOG.warn("Error reading the error stream", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Thread outThread = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
String line = inReader.readLine();
|
||||||
|
while((line != null) && !isInterrupted()) {
|
||||||
|
System.out.println(line);
|
||||||
|
line = inReader.readLine();
|
||||||
|
}
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
LOG.warn("Error reading the out stream", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
errThread.start();
|
||||||
|
outThread.start();
|
||||||
|
} catch (IllegalStateException ise) { }
|
||||||
|
|
||||||
|
// wait for the process to finish and check the exit code
|
||||||
|
try {
|
||||||
|
int exitCode = amProc.waitFor();
|
||||||
|
LOG.info("AM process exited with value: " + exitCode);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// make sure that the error thread exits
|
||||||
|
// on Windows these threads sometimes get stuck and hang the execution
|
||||||
|
// timeout and join later after destroying the process.
|
||||||
|
errThread.join();
|
||||||
|
outThread.join();
|
||||||
|
errReader.close();
|
||||||
|
inReader.close();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.info("ShellExecutor: Interrupted while reading the error/out stream",
|
||||||
|
ie);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Error while closing the error/out stream", ioe);
|
||||||
|
}
|
||||||
|
amProc.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean run() throws IOException {
|
||||||
|
LOG.info("Starting Client");
|
||||||
|
|
||||||
|
// Connect to ResourceManager
|
||||||
|
connectToRM();
|
||||||
|
assert (rmClient != null);
|
||||||
|
|
||||||
|
// Get a new application id
|
||||||
|
GetNewApplicationResponse newApp = getApplication();
|
||||||
|
ApplicationId appId = newApp.getApplicationId();
|
||||||
|
|
||||||
|
// Create launch context for app master
|
||||||
|
LOG.info("Setting up application submission context for ASM");
|
||||||
|
ApplicationSubmissionContext appContext = Records
|
||||||
|
.newRecord(ApplicationSubmissionContext.class);
|
||||||
|
|
||||||
|
// set the application id
|
||||||
|
appContext.setApplicationId(appId);
|
||||||
|
// set the application name
|
||||||
|
appContext.setApplicationName(appName);
|
||||||
|
|
||||||
|
// Set the priority for the application master
|
||||||
|
Priority pri = Records.newRecord(Priority.class);
|
||||||
|
pri.setPriority(amPriority);
|
||||||
|
appContext.setPriority(pri);
|
||||||
|
|
||||||
|
// Set the queue to which this application is to be submitted in the RM
|
||||||
|
appContext.setQueue(amQueue);
|
||||||
|
|
||||||
|
// Set up the container launch context for the application master
|
||||||
|
ContainerLaunchContext amContainer = Records
|
||||||
|
.newRecord(ContainerLaunchContext.class);
|
||||||
|
appContext.setAMContainerSpec(amContainer);
|
||||||
|
|
||||||
|
// unmanaged AM
|
||||||
|
appContext.setUnmanagedAM(true);
|
||||||
|
LOG.info("Setting unmanaged AM");
|
||||||
|
|
||||||
|
// Create the request to send to the applications manager
|
||||||
|
SubmitApplicationRequest appRequest = Records
|
||||||
|
.newRecord(SubmitApplicationRequest.class);
|
||||||
|
appRequest.setApplicationSubmissionContext(appContext);
|
||||||
|
|
||||||
|
// Submit the application to the applications manager
|
||||||
|
LOG.info("Submitting application to ASM");
|
||||||
|
rmClient.submitApplication(appRequest);
|
||||||
|
|
||||||
|
// Monitor the application to wait for launch state
|
||||||
|
ApplicationReport appReport = monitorApplication(appId,
|
||||||
|
EnumSet.of(YarnApplicationState.ACCEPTED));
|
||||||
|
ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
|
||||||
|
LOG.info("Launching application with id: " + attemptId);
|
||||||
|
|
||||||
|
// launch AM
|
||||||
|
launchAM(attemptId);
|
||||||
|
|
||||||
|
// Monitor the application for end state
|
||||||
|
appReport = monitorApplication(appId, EnumSet.of(
|
||||||
|
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
|
||||||
|
YarnApplicationState.FINISHED));
|
||||||
|
YarnApplicationState appState = appReport.getYarnApplicationState();
|
||||||
|
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
|
||||||
|
|
||||||
|
LOG.info("App ended with state: " + appReport.getYarnApplicationState()
|
||||||
|
+ " and status: " + appStatus);
|
||||||
|
if (YarnApplicationState.FINISHED == appState
|
||||||
|
&& FinalApplicationStatus.SUCCEEDED == appStatus) {
|
||||||
|
LOG.info("Application has completed successfully.");
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
LOG.info("Application did finished unsuccessfully." + " YarnState="
|
||||||
|
+ appState.toString() + ", FinalStatus=" + appStatus.toString());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Monitor the submitted application for completion. Kill application if time
|
||||||
|
* expires.
|
||||||
|
*
|
||||||
|
* @param appId
|
||||||
|
* Application Id of application to be monitored
|
||||||
|
* @return true if application completed successfully
|
||||||
|
* @throws YarnRemoteException
|
||||||
|
*/
|
||||||
|
private ApplicationReport monitorApplication(ApplicationId appId,
|
||||||
|
Set<YarnApplicationState> finalState) throws YarnRemoteException {
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
// Check app status every 1 second.
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.debug("Thread sleep in monitoring loop interrupted");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get application report for the appId we are interested in
|
||||||
|
GetApplicationReportRequest reportRequest = Records
|
||||||
|
.newRecord(GetApplicationReportRequest.class);
|
||||||
|
reportRequest.setApplicationId(appId);
|
||||||
|
GetApplicationReportResponse reportResponse = rmClient
|
||||||
|
.getApplicationReport(reportRequest);
|
||||||
|
ApplicationReport report = reportResponse.getApplicationReport();
|
||||||
|
|
||||||
|
LOG.info("Got application report from ASM for" + ", appId="
|
||||||
|
+ appId.getId() + ", appAttemptId="
|
||||||
|
+ report.getCurrentApplicationAttemptId() + ", clientToken="
|
||||||
|
+ report.getClientToken() + ", appDiagnostics="
|
||||||
|
+ report.getDiagnostics() + ", appMasterHost=" + report.getHost()
|
||||||
|
+ ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
|
||||||
|
+ report.getRpcPort() + ", appStartTime=" + report.getStartTime()
|
||||||
|
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
|
||||||
|
+ ", distributedFinalState="
|
||||||
|
+ report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
|
||||||
|
+ report.getTrackingUrl() + ", appUser=" + report.getUser());
|
||||||
|
|
||||||
|
YarnApplicationState state = report.getYarnApplicationState();
|
||||||
|
if (finalState.contains(state)) {
|
||||||
|
return report;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,163 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestUnmanagedAMLauncher {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestUnmanagedAMLauncher.class);
|
||||||
|
|
||||||
|
protected static MiniYARNCluster yarnCluster = null;
|
||||||
|
protected static Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws InterruptedException, IOException {
|
||||||
|
LOG.info("Starting up YARN cluster");
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
|
||||||
|
if (yarnCluster == null) {
|
||||||
|
yarnCluster = new MiniYARNCluster(
|
||||||
|
TestUnmanagedAMLauncher.class.getName(), 1, 1, 1);
|
||||||
|
yarnCluster.init(conf);
|
||||||
|
yarnCluster.start();
|
||||||
|
URL url = Thread.currentThread().getContextClassLoader()
|
||||||
|
.getResource("yarn-site.xml");
|
||||||
|
if (url == null) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Could not find 'yarn-site.xml' dummy file in classpath");
|
||||||
|
}
|
||||||
|
OutputStream os = new FileOutputStream(new File(url.getPath()));
|
||||||
|
yarnCluster.getConfig().writeXml(os);
|
||||||
|
os.close();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws IOException {
|
||||||
|
if (yarnCluster != null) {
|
||||||
|
yarnCluster.stop();
|
||||||
|
yarnCluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getTestRuntimeClasspath() {
|
||||||
|
|
||||||
|
InputStream classpathFileStream = null;
|
||||||
|
BufferedReader reader = null;
|
||||||
|
String envClassPath = "";
|
||||||
|
|
||||||
|
LOG.info("Trying to generate classpath for app master from current thread's classpath");
|
||||||
|
try {
|
||||||
|
|
||||||
|
// Create classpath from generated classpath
|
||||||
|
// Check maven pom.xml for generated classpath info
|
||||||
|
// Works if compile time env is same as runtime. Mainly tests.
|
||||||
|
ClassLoader thisClassLoader = Thread.currentThread()
|
||||||
|
.getContextClassLoader();
|
||||||
|
String generatedClasspathFile = "yarn-apps-am-generated-classpath";
|
||||||
|
classpathFileStream = thisClassLoader
|
||||||
|
.getResourceAsStream(generatedClasspathFile);
|
||||||
|
if (classpathFileStream == null) {
|
||||||
|
LOG.info("Could not classpath resource from class loader");
|
||||||
|
return envClassPath;
|
||||||
|
}
|
||||||
|
LOG.info("Readable bytes from stream=" + classpathFileStream.available());
|
||||||
|
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
|
||||||
|
String cp = reader.readLine();
|
||||||
|
if (cp != null) {
|
||||||
|
envClassPath += cp.trim() + File.pathSeparator;
|
||||||
|
}
|
||||||
|
// yarn-site.xml at this location contains proper config for mini cluster
|
||||||
|
URL url = thisClassLoader.getResource("yarn-site.xml");
|
||||||
|
envClassPath += new File(url.getFile()).getParent();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Could not find the necessary resource to generate class path for tests. Error="
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (classpathFileStream != null) {
|
||||||
|
classpathFileStream.close();
|
||||||
|
}
|
||||||
|
if (reader != null) {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Failed to close class path file stream or reader. Error="
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
return envClassPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDSShell() throws Exception {
|
||||||
|
String classpath = getTestRuntimeClasspath();
|
||||||
|
String javaHome = System.getenv("JAVA_HOME");
|
||||||
|
if (javaHome == null) {
|
||||||
|
LOG.fatal("JAVA_HOME not defined. Test not running.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// start dist-shell with 0 containers because container launch will fail if
|
||||||
|
// there are no dist cache resources.
|
||||||
|
String[] args = {
|
||||||
|
"--classpath",
|
||||||
|
classpath,
|
||||||
|
"--cmd",
|
||||||
|
javaHome
|
||||||
|
+ "/bin/java -Xmx512m "
|
||||||
|
+ "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
|
||||||
|
+ "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" };
|
||||||
|
|
||||||
|
LOG.info("Initializing Launcher");
|
||||||
|
UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
|
||||||
|
yarnCluster.getConfig()));
|
||||||
|
boolean initSuccess = launcher.init(args);
|
||||||
|
Assert.assertTrue(initSuccess);
|
||||||
|
LOG.info("Running Launcher");
|
||||||
|
boolean result = launcher.run();
|
||||||
|
|
||||||
|
LOG.info("Launcher run completed. Result=" + result);
|
||||||
|
Assert.assertTrue(result);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<!--
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. See accompanying LICENSE file.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration>
|
||||||
|
<!-- Dummy (invalid) config file to be overwriten by TestUnmanagedAMLauncher with MiniCluster configuration. -->
|
||||||
|
</configuration>
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
|
|
||||||
<modules>
|
<modules>
|
||||||
<module>hadoop-yarn-applications-distributedshell</module>
|
<module>hadoop-yarn-applications-distributedshell</module>
|
||||||
|
<module>hadoop-yarn-applications-unmanaged-am-launcher</module>
|
||||||
</modules>
|
</modules>
|
||||||
<profiles>
|
<profiles>
|
||||||
<profile>
|
<profile>
|
||||||
|
|
Loading…
Reference in New Issue