Merge -c 1365185 from trunk to branch-2 to fix MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN. Contributed by Bikas Saha.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1365190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4fdf282b6a
commit
95a5222e4a
|
@ -35,6 +35,9 @@ Release 2.1.0-alpha - Unreleased
|
|||
|
||||
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
|
||||
|
||||
MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN.
|
||||
(Bikas Saha via acmurthy)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
|
||||
|
|
|
@ -290,7 +290,10 @@ public class ApplicationMaster {
|
|||
Map<String, String> envs = System.getenv();
|
||||
|
||||
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
|
||||
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
||||
if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
|
||||
appAttemptID = ConverterUtils.toApplicationAttemptId(envs
|
||||
.get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
|
||||
} else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
||||
if (cliParser.hasOption("app_attempt_id")) {
|
||||
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
||||
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hadoop-yarn-applications</artifactId>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-applications-unmanaged-am-launcher</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<name>hadoop-yarn-applications-unmanaged-am-launcher</name>
|
||||
|
||||
<properties>
|
||||
<!-- Needed for generating FindBugs warnings using parent pom -->
|
||||
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>build-classpath</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>build-classpath</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- needed to run the unit test for DS to generate the required classpath
|
||||
that is required in the env of the launch container in the mini yarn cluster -->
|
||||
<outputFile>target/classes/yarn-apps-am-generated-classpath</outputFile>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<environmentVariables>
|
||||
<JAVA_HOME>${java.home}</JAVA_HOME>
|
||||
</environmentVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
</project>
|
|
@ -0,0 +1,405 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* The UnmanagedLauncher is a simple client that launches and unmanaged AM. An
|
||||
* unmanagedAM is an AM that is not launched and managed by the RM. The client
|
||||
* creates a new application on the RM and negotiates a new attempt id. Then it
|
||||
* waits for the RM app state to reach be YarnApplicationState.ACCEPTED after
|
||||
* which it spawns the AM in another process and passes it the attempt id via
|
||||
* env variable ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. The AM can be in any
|
||||
* language. The AM can register with the RM using the attempt id and proceed as
|
||||
* normal. The client redirects app stdout and stderr to its own stdout and
|
||||
* stderr and waits for the AM process to exit. Then it waits for the RM to
|
||||
* report app completion.
|
||||
*/
|
||||
public class UnmanagedAMLauncher {
|
||||
private static final Log LOG = LogFactory.getLog(UnmanagedAMLauncher.class);
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
// RPC to communicate to RM
|
||||
private YarnRPC rpc;
|
||||
|
||||
// Handle to talk to the Resource Manager/Applications Manager
|
||||
private ClientRMProtocol rmClient;
|
||||
|
||||
// Application master specific info to register a new Application with RM/ASM
|
||||
private String appName = "";
|
||||
// App master priority
|
||||
private int amPriority = 0;
|
||||
// Queue for App master
|
||||
private String amQueue = "";
|
||||
// cmd to start AM
|
||||
private String amCmd = null;
|
||||
// set the classpath explicitly
|
||||
private String classpath = null;
|
||||
|
||||
/**
|
||||
* @param args
|
||||
* Command line arguments
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
UnmanagedAMLauncher client = new UnmanagedAMLauncher();
|
||||
LOG.info("Initializing Client");
|
||||
boolean doRun = client.init(args);
|
||||
if (!doRun) {
|
||||
System.exit(0);
|
||||
}
|
||||
client.run();
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Error running Client", t);
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*/
|
||||
public UnmanagedAMLauncher(Configuration conf) throws Exception {
|
||||
// Set up RPC
|
||||
this.conf = conf;
|
||||
rpc = YarnRPC.create(conf);
|
||||
}
|
||||
|
||||
public UnmanagedAMLauncher() throws Exception {
|
||||
this(new Configuration());
|
||||
}
|
||||
|
||||
private void printUsage(Options opts) {
|
||||
new HelpFormatter().printHelp("Client", opts);
|
||||
}
|
||||
|
||||
public boolean init(String[] args) throws ParseException {
|
||||
|
||||
Options opts = new Options();
|
||||
opts.addOption("appname", true,
|
||||
"Application Name. Default value - UnmanagedAM");
|
||||
opts.addOption("priority", true, "Application Priority. Default 0");
|
||||
opts.addOption("queue", true,
|
||||
"RM Queue in which this application is to be submitted");
|
||||
opts.addOption("master_memory", true,
|
||||
"Amount of memory in MB to be requested to run the application master");
|
||||
opts.addOption("cmd", true, "command to start unmanaged AM (required)");
|
||||
opts.addOption("classpath", true, "additional classpath");
|
||||
opts.addOption("help", false, "Print usage");
|
||||
CommandLine cliParser = new GnuParser().parse(opts, args);
|
||||
|
||||
if (args.length == 0) {
|
||||
printUsage(opts);
|
||||
throw new IllegalArgumentException(
|
||||
"No args specified for client to initialize");
|
||||
}
|
||||
|
||||
if (cliParser.hasOption("help")) {
|
||||
printUsage(opts);
|
||||
return false;
|
||||
}
|
||||
|
||||
appName = cliParser.getOptionValue("appname", "UnmanagedAM");
|
||||
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
|
||||
amQueue = cliParser.getOptionValue("queue", "");
|
||||
classpath = cliParser.getOptionValue("classpath", null);
|
||||
|
||||
amCmd = cliParser.getOptionValue("cmd");
|
||||
if (amCmd == null) {
|
||||
printUsage(opts);
|
||||
throw new IllegalArgumentException(
|
||||
"No cmd specified for application master");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void connectToRM() throws IOException {
|
||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
||||
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_PORT);
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||
rmAddress, conf));
|
||||
}
|
||||
|
||||
private GetNewApplicationResponse getApplication() throws YarnRemoteException {
|
||||
GetNewApplicationRequest request = Records
|
||||
.newRecord(GetNewApplicationRequest.class);
|
||||
GetNewApplicationResponse response = rmClient.getNewApplication(request);
|
||||
LOG.info("Got new application id=" + response.getApplicationId());
|
||||
return response;
|
||||
}
|
||||
|
||||
public void launchAM(ApplicationAttemptId attemptId) throws IOException {
|
||||
Map<String, String> env = System.getenv();
|
||||
ArrayList<String> envAMList = new ArrayList<String>();
|
||||
boolean setClasspath = false;
|
||||
for (Map.Entry<String, String> entry : env.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
String value = entry.getValue();
|
||||
if(key.equals("CLASSPATH")) {
|
||||
setClasspath = true;
|
||||
if(classpath != null) {
|
||||
value = value + File.pathSeparator + classpath;
|
||||
}
|
||||
}
|
||||
envAMList.add(key + "=" + value);
|
||||
}
|
||||
|
||||
if(!setClasspath && classpath!=null) {
|
||||
envAMList.add("CLASSPATH="+classpath);
|
||||
}
|
||||
|
||||
envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
|
||||
|
||||
String[] envAM = new String[envAMList.size()];
|
||||
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
|
||||
|
||||
final BufferedReader errReader =
|
||||
new BufferedReader(new InputStreamReader(amProc
|
||||
.getErrorStream()));
|
||||
final BufferedReader inReader =
|
||||
new BufferedReader(new InputStreamReader(amProc
|
||||
.getInputStream()));
|
||||
|
||||
// read error and input streams as this would free up the buffers
|
||||
// free the error stream buffer
|
||||
Thread errThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
String line = errReader.readLine();
|
||||
while((line != null) && !isInterrupted()) {
|
||||
System.err.println(line);
|
||||
line = errReader.readLine();
|
||||
}
|
||||
} catch(IOException ioe) {
|
||||
LOG.warn("Error reading the error stream", ioe);
|
||||
}
|
||||
}
|
||||
};
|
||||
Thread outThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
String line = inReader.readLine();
|
||||
while((line != null) && !isInterrupted()) {
|
||||
System.out.println(line);
|
||||
line = inReader.readLine();
|
||||
}
|
||||
} catch(IOException ioe) {
|
||||
LOG.warn("Error reading the out stream", ioe);
|
||||
}
|
||||
}
|
||||
};
|
||||
try {
|
||||
errThread.start();
|
||||
outThread.start();
|
||||
} catch (IllegalStateException ise) { }
|
||||
|
||||
// wait for the process to finish and check the exit code
|
||||
try {
|
||||
int exitCode = amProc.waitFor();
|
||||
LOG.info("AM process exited with value: " + exitCode);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
try {
|
||||
// make sure that the error thread exits
|
||||
// on Windows these threads sometimes get stuck and hang the execution
|
||||
// timeout and join later after destroying the process.
|
||||
errThread.join();
|
||||
outThread.join();
|
||||
errReader.close();
|
||||
inReader.close();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("ShellExecutor: Interrupted while reading the error/out stream",
|
||||
ie);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Error while closing the error/out stream", ioe);
|
||||
}
|
||||
amProc.destroy();
|
||||
}
|
||||
|
||||
public boolean run() throws IOException {
|
||||
LOG.info("Starting Client");
|
||||
|
||||
// Connect to ResourceManager
|
||||
connectToRM();
|
||||
assert (rmClient != null);
|
||||
|
||||
// Get a new application id
|
||||
GetNewApplicationResponse newApp = getApplication();
|
||||
ApplicationId appId = newApp.getApplicationId();
|
||||
|
||||
// Create launch context for app master
|
||||
LOG.info("Setting up application submission context for ASM");
|
||||
ApplicationSubmissionContext appContext = Records
|
||||
.newRecord(ApplicationSubmissionContext.class);
|
||||
|
||||
// set the application id
|
||||
appContext.setApplicationId(appId);
|
||||
// set the application name
|
||||
appContext.setApplicationName(appName);
|
||||
|
||||
// Set the priority for the application master
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
pri.setPriority(amPriority);
|
||||
appContext.setPriority(pri);
|
||||
|
||||
// Set the queue to which this application is to be submitted in the RM
|
||||
appContext.setQueue(amQueue);
|
||||
|
||||
// Set up the container launch context for the application master
|
||||
ContainerLaunchContext amContainer = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
appContext.setAMContainerSpec(amContainer);
|
||||
|
||||
// unmanaged AM
|
||||
appContext.setUnmanagedAM(true);
|
||||
LOG.info("Setting unmanaged AM");
|
||||
|
||||
// Create the request to send to the applications manager
|
||||
SubmitApplicationRequest appRequest = Records
|
||||
.newRecord(SubmitApplicationRequest.class);
|
||||
appRequest.setApplicationSubmissionContext(appContext);
|
||||
|
||||
// Submit the application to the applications manager
|
||||
LOG.info("Submitting application to ASM");
|
||||
rmClient.submitApplication(appRequest);
|
||||
|
||||
// Monitor the application to wait for launch state
|
||||
ApplicationReport appReport = monitorApplication(appId,
|
||||
EnumSet.of(YarnApplicationState.ACCEPTED));
|
||||
ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
|
||||
LOG.info("Launching application with id: " + attemptId);
|
||||
|
||||
// launch AM
|
||||
launchAM(attemptId);
|
||||
|
||||
// Monitor the application for end state
|
||||
appReport = monitorApplication(appId, EnumSet.of(
|
||||
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
|
||||
YarnApplicationState.FINISHED));
|
||||
YarnApplicationState appState = appReport.getYarnApplicationState();
|
||||
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
|
||||
|
||||
LOG.info("App ended with state: " + appReport.getYarnApplicationState()
|
||||
+ " and status: " + appStatus);
|
||||
if (YarnApplicationState.FINISHED == appState
|
||||
&& FinalApplicationStatus.SUCCEEDED == appStatus) {
|
||||
LOG.info("Application has completed successfully.");
|
||||
return true;
|
||||
} else {
|
||||
LOG.info("Application did finished unsuccessfully." + " YarnState="
|
||||
+ appState.toString() + ", FinalStatus=" + appStatus.toString());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitor the submitted application for completion. Kill application if time
|
||||
* expires.
|
||||
*
|
||||
* @param appId
|
||||
* Application Id of application to be monitored
|
||||
* @return true if application completed successfully
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private ApplicationReport monitorApplication(ApplicationId appId,
|
||||
Set<YarnApplicationState> finalState) throws YarnRemoteException {
|
||||
|
||||
while (true) {
|
||||
|
||||
// Check app status every 1 second.
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Thread sleep in monitoring loop interrupted");
|
||||
}
|
||||
|
||||
// Get application report for the appId we are interested in
|
||||
GetApplicationReportRequest reportRequest = Records
|
||||
.newRecord(GetApplicationReportRequest.class);
|
||||
reportRequest.setApplicationId(appId);
|
||||
GetApplicationReportResponse reportResponse = rmClient
|
||||
.getApplicationReport(reportRequest);
|
||||
ApplicationReport report = reportResponse.getApplicationReport();
|
||||
|
||||
LOG.info("Got application report from ASM for" + ", appId="
|
||||
+ appId.getId() + ", appAttemptId="
|
||||
+ report.getCurrentApplicationAttemptId() + ", clientToken="
|
||||
+ report.getClientToken() + ", appDiagnostics="
|
||||
+ report.getDiagnostics() + ", appMasterHost=" + report.getHost()
|
||||
+ ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
|
||||
+ report.getRpcPort() + ", appStartTime=" + report.getStartTime()
|
||||
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
|
||||
+ ", distributedFinalState="
|
||||
+ report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
|
||||
+ report.getTrackingUrl() + ", appUser=" + report.getUser());
|
||||
|
||||
YarnApplicationState state = report.getYarnApplicationState();
|
||||
if (finalState.contains(state)) {
|
||||
return report;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URL;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestUnmanagedAMLauncher {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestUnmanagedAMLauncher.class);
|
||||
|
||||
protected static MiniYARNCluster yarnCluster = null;
|
||||
protected static Configuration conf = new Configuration();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws InterruptedException, IOException {
|
||||
LOG.info("Starting up YARN cluster");
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
|
||||
if (yarnCluster == null) {
|
||||
yarnCluster = new MiniYARNCluster(
|
||||
TestUnmanagedAMLauncher.class.getName(), 1, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
URL url = Thread.currentThread().getContextClassLoader()
|
||||
.getResource("yarn-site.xml");
|
||||
if (url == null) {
|
||||
throw new RuntimeException(
|
||||
"Could not find 'yarn-site.xml' dummy file in classpath");
|
||||
}
|
||||
OutputStream os = new FileOutputStream(new File(url.getPath()));
|
||||
yarnCluster.getConfig().writeXml(os);
|
||||
os.close();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
if (yarnCluster != null) {
|
||||
yarnCluster.stop();
|
||||
yarnCluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static String getTestRuntimeClasspath() {
|
||||
|
||||
InputStream classpathFileStream = null;
|
||||
BufferedReader reader = null;
|
||||
String envClassPath = "";
|
||||
|
||||
LOG.info("Trying to generate classpath for app master from current thread's classpath");
|
||||
try {
|
||||
|
||||
// Create classpath from generated classpath
|
||||
// Check maven pom.xml for generated classpath info
|
||||
// Works if compile time env is same as runtime. Mainly tests.
|
||||
ClassLoader thisClassLoader = Thread.currentThread()
|
||||
.getContextClassLoader();
|
||||
String generatedClasspathFile = "yarn-apps-am-generated-classpath";
|
||||
classpathFileStream = thisClassLoader
|
||||
.getResourceAsStream(generatedClasspathFile);
|
||||
if (classpathFileStream == null) {
|
||||
LOG.info("Could not classpath resource from class loader");
|
||||
return envClassPath;
|
||||
}
|
||||
LOG.info("Readable bytes from stream=" + classpathFileStream.available());
|
||||
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
|
||||
String cp = reader.readLine();
|
||||
if (cp != null) {
|
||||
envClassPath += cp.trim() + File.pathSeparator;
|
||||
}
|
||||
// yarn-site.xml at this location contains proper config for mini cluster
|
||||
URL url = thisClassLoader.getResource("yarn-site.xml");
|
||||
envClassPath += new File(url.getFile()).getParent();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Could not find the necessary resource to generate class path for tests. Error="
|
||||
+ e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
if (classpathFileStream != null) {
|
||||
classpathFileStream.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to close class path file stream or reader. Error="
|
||||
+ e.getMessage());
|
||||
}
|
||||
return envClassPath;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDSShell() throws Exception {
|
||||
String classpath = getTestRuntimeClasspath();
|
||||
String javaHome = System.getenv("JAVA_HOME");
|
||||
if (javaHome == null) {
|
||||
LOG.fatal("JAVA_HOME not defined. Test not running.");
|
||||
return;
|
||||
}
|
||||
// start dist-shell with 0 containers because container launch will fail if
|
||||
// there are no dist cache resources.
|
||||
String[] args = {
|
||||
"--classpath",
|
||||
classpath,
|
||||
"--cmd",
|
||||
javaHome
|
||||
+ "/bin/java -Xmx512m "
|
||||
+ "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
|
||||
+ "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" };
|
||||
|
||||
LOG.info("Initializing Launcher");
|
||||
UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
|
||||
yarnCluster.getConfig()));
|
||||
boolean initSuccess = launcher.init(args);
|
||||
Assert.assertTrue(initSuccess);
|
||||
LOG.info("Running Launcher");
|
||||
boolean result = launcher.run();
|
||||
|
||||
LOG.info("Launcher run completed. Result=" + result);
|
||||
Assert.assertTrue(result);
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<!-- Dummy (invalid) config file to be overwriten by TestUnmanagedAMLauncher with MiniCluster configuration. -->
|
||||
</configuration>
|
||||
|
||||
|
|
@ -30,6 +30,7 @@
|
|||
|
||||
<modules>
|
||||
<module>hadoop-yarn-applications-distributedshell</module>
|
||||
<module>hadoop-yarn-applications-unmanaged-am-launcher</module>
|
||||
</modules>
|
||||
<profiles>
|
||||
<profile>
|
||||
|
|
Loading…
Reference in New Issue