YARN-1824. Improved NodeManager and clients to be able to handle cross platform application submissions. Contributed by Jian He.

MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to handle cross platform application submissions. Contributed by Jian He.
svn merge --ignore-ancestry -c 1578135 ../../trunk/ with a couple of minor edits for working in branch-2.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1578139 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-16 19:13:16 +00:00
parent a42d5c79ab
commit d470c7b71a
20 changed files with 404 additions and 158 deletions

View File

@ -54,6 +54,9 @@ Release 2.4.0 - UNRELEASED
completed/pending/successful/failed tasks on MR AM web-ui. (Paul Han via
acmurthy)
MAPREDUCE-4052. Improved MapReduce clients to use NodeManagers' ability to
handle cross platform application submissions. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
@SuppressWarnings("deprecation")
public class MapReduceChildJVM {
@ -69,10 +68,9 @@ public static void setVMEnv(Map<String, String> environment,
Task task) {
JobConf conf = task.conf;
// Add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
Apps.setEnvFromInputString(environment, mapredChildEnv);
MRApps.setEnvFromInputString(environment, mapredChildEnv, conf);
// Set logging level in the environment.
// This is so that, if the child forks another "bin/hadoop" (common in
@ -164,7 +162,8 @@ public static List<String> getVMCommand(
Vector<String> vargs = new Vector<String>(8);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
+ "/bin/java");
// Add child (task) java-vm options.
//
@ -201,7 +200,7 @@ public static List<String> getVMCommand(
vargs.add(javaOptsSplit[i]);
}
Path childTmpDir = new Path(Environment.PWD.$(),
Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);

View File

@ -24,6 +24,7 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -31,7 +32,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -132,7 +132,6 @@
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@ -627,7 +626,6 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
Token<JobTokenIdentifier> jobToken,
final org.apache.hadoop.mapred.JobID oldJobId,
Credentials credentials) {
// Application resources
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
@ -743,16 +741,16 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
}
}
Apps.addToEnvironment(
MRApps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
getInitialClasspath(conf));
getInitialClasspath(conf), conf);
if (initialAppClasspath != null) {
Apps.addToEnvironment(
MRApps.addToEnvironment(
environment,
Environment.APP_CLASSPATH.name(),
initialAppClasspath);
initialAppClasspath, conf);
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
@ -767,17 +765,17 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
);
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
Apps.addToEnvironment(
MRApps.addToEnvironment(
environment,
Environment.LD_LIBRARY_PATH.name(),
Environment.PWD.$());
MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
// Add the env variables passed by the admin
Apps.setEnvFromInputString(
MRApps.setEnvFromInputString(
environment,
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf
);
// Construct the actual Container

View File

@ -23,6 +23,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
@ -30,7 +32,7 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.junit.Test;
@ -42,15 +44,17 @@ public class TestMapReduceChildJVM {
public void testCommandLine() throws Exception {
MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
Job job = app.submit(new Configuration());
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
Job job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals(
"[" + envVar("JAVA_HOME") + "/bin/java" +
"[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
" -Djava.net.preferIPv4Stack=true" +
" -Dhadoop.metrics.log.level=WARN" +
" -Xmx200m -Djava.io.tmpdir=" + envVar("PWD") + "/tmp" +
" -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
" -Dlog4j.configuration=container-log4j.properties" +
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
" -Dyarn.app.container.log.filesize=0" +
@ -90,16 +94,4 @@ public void handle(ContainerLauncherEvent event) {
};
}
}
/**
* Returns platform-specific string for retrieving the value of an environment
* variable with the given name. On Unix, this returns $name. On Windows,
* this returns %name%.
*
* @param name String environment variable name
* @return String for retrieving value of environment variable
*/
private static String envVar(String name) {
return Shell.WINDOWS ? '%' + name + '%' : '$' + name;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.util;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
@ -29,16 +30,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
@ -50,6 +55,8 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -186,29 +193,33 @@ private static void setMRFrameworkClasspath(
Map<String, String> environment, Configuration conf) throws IOException {
// Propagate the system classpath when using the mini cluster
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
System.getProperty("java.class.path"));
MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
System.getProperty("java.class.path"), conf);
}
boolean crossPlatform =
conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
// if the framework is specified then only use the MR classpath
String frameworkName = getMRFrameworkName(conf);
if (frameworkName == null) {
// Add standard Hadoop classes
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
.trim());
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
crossPlatform
? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH
: YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
c.trim(), conf);
}
}
boolean foundFrameworkInClasspath = (frameworkName == null);
for (String c : conf.getStrings(
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
StringUtils.getStrings(
MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))){
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
.trim());
for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
crossPlatform ?
StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH)
: StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
c.trim(), conf);
if (!foundFrameworkInClasspath) {
foundFrameworkInClasspath = c.contains(frameworkName);
}
@ -232,28 +243,27 @@ public static void setClasspath(Map<String, String> environment,
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
Apps.addToEnvironment(environment,
classpathEnvVar,
Environment.PWD.$());
MRApps.addToEnvironment(environment,
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
Apps.addToEnvironment(
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR);
Apps.addToEnvironment(
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR);
Apps.addToEnvironment(
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*");
Apps.addToEnvironment(
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
Environment.PWD.$() + Path.SEPARATOR + "*");
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
// a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
@ -306,10 +316,10 @@ private static void addToClasspathIfNotJar(Path[] paths,
name = p.getName();
}
if(!name.toLowerCase().endsWith(".jar")) {
Apps.addToEnvironment(
MRApps.addToEnvironment(
environment,
classpathEnvVar,
Environment.PWD.$() + Path.SEPARATOR + name);
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + name, conf);
}
}
}
@ -532,4 +542,31 @@ public static void addLog4jSystemProperties(
vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
}
}
public static void setEnvFromInputString(Map<String, String> env,
String envString, Configuration conf) {
String classPathSeparator =
conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
Apps.setEnvFromInputString(env, envString, classPathSeparator);
}
@Public
@Unstable
public static void addToEnvironment(Map<String, String> environment,
String variable, String value, Configuration conf) {
String classPathSeparator =
conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
Apps.addToEnvironment(environment, variable, value, classPathSeparator);
}
public static String crossPlatformifyMREnv(Configuration conf, Environment env) {
boolean crossPlatform =
conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
return crossPlatform ? env.$$() : env.$();
}
}

View File

@ -18,6 +18,15 @@
package org.apache.hadoop.mapreduce.v2.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -32,6 +41,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -52,9 +62,6 @@
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestMRApps {
private static File testWorkDir = null;
@ -179,27 +186,32 @@ public void testGetJobFileWithUser() {
@Test (timeout = 120000)
public void testSetClasspath() throws IOException {
Job job = Job.getInstance();
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
Job job = Job.getInstance(conf);
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration());
assertTrue(environment.get("CLASSPATH").startsWith(
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
ApplicationConstants.Environment.PWD.$$()
+ ApplicationConstants.CLASS_PATH_SEPARATOR));
String yarnAppClasspath = job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",",
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH));
if (yarnAppClasspath != null) {
yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
.trim();
yarnAppClasspath =
yarnAppClasspath.replaceAll(",\\s*",
ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
}
assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
String mrAppClasspath =
job.getConfiguration().get(
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH);
if (mrAppClasspath != null) {
mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", File.pathSeparator)
.trim();
mrAppClasspath =
mrAppClasspath.replaceAll(",\\s*",
ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
}
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
}
@ -210,8 +222,10 @@ public void testSetClasspathWithArchives () throws IOException {
FileOutputStream out = new FileOutputStream(testTGZ);
out.write(0);
out.close();
Job job = Job.getInstance();
Configuration conf = job.getConfiguration();
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
Job job = Job.getInstance(conf);
conf = job.getConfiguration();
String testTGZQualifiedPath = FileSystem.getLocal(conf).makeQualified(new Path(
testTGZ.getAbsolutePath())).toString();
conf.set(MRJobConfig.CLASSPATH_ARCHIVES, testTGZQualifiedPath);
@ -219,13 +233,13 @@ public void testSetClasspathWithArchives () throws IOException {
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);
assertTrue(environment.get("CLASSPATH").startsWith(
ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
ApplicationConstants.Environment.PWD.$$() + ApplicationConstants.CLASS_PATH_SEPARATOR));
String confClasspath = job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",",
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH));
if (confClasspath != null) {
confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
confClasspath = confClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR)
.trim();
}
assertTrue(environment.get("CLASSPATH").contains(confClasspath));
@ -235,6 +249,7 @@ public void testSetClasspathWithArchives () throws IOException {
@Test (timeout = 120000)
public void testSetClasspathWithUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Map<String, String> env = new HashMap<String, String>();
try {
@ -243,10 +258,10 @@ public void testSetClasspathWithUserPrecendence() {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
env_str.startsWith(expectedClasspath));
}
@ -254,6 +269,7 @@ public void testSetClasspathWithUserPrecendence() {
@Test (timeout = 120000)
public void testSetClasspathWithNoUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Map<String, String> env = new HashMap<String, String>();
try {
@ -262,9 +278,9 @@ public void testSetClasspathWithNoUserPrecendence() {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(File.pathSeparator,
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+ " the classpath!", env_str.contains(expectedClasspath));
assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
@ -274,19 +290,20 @@ public void testSetClasspathWithNoUserPrecendence() {
@Test (timeout = 120000)
public void testSetClasspathWithJobClassloader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
String cp = env.get("CLASSPATH");
String appCp = env.get("APP_CLASSPATH");
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+ " classpath!", cp.contains("jar" + File.pathSeparator + "job"));
+ " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job"));
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
cp.contains("PWD"));
String expectedAppClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
+ " classpath!", expectedAppClasspath, appCp);
}
@ -296,6 +313,7 @@ public void testSetClasspathWithFramework() throws IOException {
final String FRAMEWORK_NAME = "some-framework-name";
final String FRAMEWORK_PATH = "some-framework-path#" + FRAMEWORK_NAME;
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.set(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, FRAMEWORK_PATH);
Map<String, String> env = new HashMap<String, String>();
try {
@ -311,11 +329,11 @@ public void testSetClasspathWithFramework() throws IOException {
final String FRAMEWORK_CLASSPATH = FRAMEWORK_NAME + "/*.jar";
conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH);
MRApps.setClasspath(env, conf);
final String stdClasspath = StringUtils.join(File.pathSeparator,
final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
String expectedClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList(ApplicationConstants.Environment.PWD.$(),
ApplicationConstants.Environment.PWD.$$() + "/*"));
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
FRAMEWORK_CLASSPATH, stdClasspath));
assertEquals("Incorrect classpath with framework and no user precedence",
expectedClasspath, env.get("CLASSPATH"));
@ -323,8 +341,8 @@ public void testSetClasspathWithFramework() throws IOException {
env.clear();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MRApps.setClasspath(env, conf);
expectedClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList(ApplicationConstants.Environment.PWD.$(),
expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
stdClasspath, FRAMEWORK_CLASSPATH));
assertEquals("Incorrect classpath with framework and user precedence",
expectedClasspath, env.get("CLASSPATH"));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Place holder for cluster level configuration keys.
@ -113,5 +115,14 @@ public interface MRConfig {
= "mapreduce.minicluster.control-resource-monitoring";
public static final boolean
DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false;
@Public
@Unstable
public static final String MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM =
"mapreduce.app-submission.cross-platform";
@Public
@Unstable
public static final boolean DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM =
false;
}

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.util.Apps;
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -681,13 +684,30 @@ public interface MRJobConfig {
"mapreduce.application.framework.path";
/**
* Default CLASSPATH for all YARN MapReduce applications.
* Default CLASSPATH for all YARN MapReduce applications constructed with
* platform-agnostic syntax.
*/
public final String
DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = Shell.WINDOWS ?
"%HADOOP_MAPRED_HOME%\\share\\hadoop\\mapreduce\\*,"
+ "%HADOOP_MAPRED_HOME%\\share\\hadoop\\mapreduce\\lib\\*" :
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,"
@Public
@Unstable
public final String DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH = Apps
.crossPlatformify("HADOOP_MAPRED_HOME")
+ "/share/hadoop/mapreduce/*,"
+ Apps.crossPlatformify("HADOOP_MAPRED_HOME")
+ "/share/hadoop/mapreduce/lib/*";
/**
* Default platform-specific CLASSPATH for all YARN MapReduce applications
* constructed based on client OS syntax.
* <p>
* Note: Use {@link DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH}
* for cross-platform practice i.e. submit an application from a Windows
* client to a Linux/Unix server or vice versa.
* </p>
*/
public final String DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH =
Shell.WINDOWS ? "%HADOOP_MAPRED_HOME%\\share\\hadoop\\mapreduce\\*,"
+ "%HADOOP_MAPRED_HOME%\\share\\hadoop\\mapreduce\\lib\\*"
: "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,"
+ "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*";
public static final String WORKFLOW_ID = "mapreduce.workflow.id";

View File

@ -1543,19 +1543,36 @@
of CLASSPATH entries. If mapreduce.application.framework is set then this
must specify the appropriate classpath for that archive, and the name of
the archive must be present in the classpath.
When this value is empty, the following default CLASSPATH for MR
applications would be used.
If mapreduce.app-submission.cross-platform is false, platform-specific
environment vairable expansion syntax would be used to construct the default
CLASSPATH entries.
For Linux:
$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,
$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*.
For Windows:
%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*,
%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*.
If mapreduce.app-submission.cross-platform is true, platform-agnostic default
CLASSPATH for MR applications would be used:
{{HADOOP_MAPRED_HOME}}/share/hadoop/mapreduce/*,
{{HADOOP_MAPRED_HOME}}/share/hadoop/mapreduce/lib/*
Parameter expansion marker will be replaced by NodeManager on container
launch based on the underlying OS accordingly.
</description>
<name>mapreduce.application.classpath</name>
<value></value>
</property>
<property>
<description>If enabled, user can submit an application cross-platform
i.e. submit an application from a Windows client to a Linux/Unix server or
vice versa.
</description>
<name>mapreduce.app-submission.cross-platform</name>
<value>false</value>
</property>
<property>
<description>Path to the MapReduce framework archive. If set, the framework
archive will automatically be distributed along with the job, and this

View File

@ -387,7 +387,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
+ "/bin/java");
// TODO: why do we use 'conf' some places and 'jobConf' others?
long logSize = jobConf.getLong(MRJobConfig.MR_AM_LOG_KB,
@ -448,10 +449,10 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// Setup the environment variables for Admin first
MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV), conf);
// Setup the environment variables (LD_LIBRARY_PATH, etc)
MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ENV));
conf.get(MRJobConfig.MR_AM_ENV), conf);
// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);

View File

@ -22,33 +22,34 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.*;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.util.Apps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Class to test mapred task's
@ -410,12 +411,12 @@ public void testMapRedExecutionEnv() {
String setupHadoopHomeCommand = Shell.WINDOWS ?
"HADOOP_COMMON_HOME=C:\\fake\\PATH\\to\\hadoop\\common\\home" :
"HADOOP_COMMON_HOME=/fake/path/to/hadoop/common/home";
Apps.setEnvFromInputString(environment, setupHadoopHomeCommand);
MRApps.setEnvFromInputString(environment, setupHadoopHomeCommand, conf);
// Add the env variables passed by the admin
Apps.setEnvFromInputString(environment, conf.get(
MRApps.setEnvFromInputString(environment, conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV), conf);
String executionPaths = environment.get(
Shell.WINDOWS ? "PATH" : "LD_LIBRARY_PATH");

View File

@ -89,7 +89,7 @@ private void startCluster(Configuration conf) throws Exception {
conf.set("hadoop.security.authentication", "simple");
String cp = conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",",
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH))
+ File.pathSeparator + classpathDir;
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, cp);
dfsCluster = new MiniDFSCluster(conf, 1, true, null);

View File

@ -288,6 +288,9 @@ Release 2.4.0 - UNRELEASED
YARN-1658. Modified web-app framework to let standby RMs redirect
web-service calls to the active RM. (Cindy Li via vinodkv)
YARN-1824. Improved NodeManager and clients to be able to handle cross
platform application submissions. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
@ -59,6 +60,39 @@ public interface ApplicationConstants {
*/
public static final String LOG_DIR_EXPANSION_VAR = "<LOG_DIR>";
/**
* This constant is used to construct class path and it will be replaced with
* real class path separator(':' for Linux and ';' for Windows) by
* NodeManager on container launch. User has to use this constant to construct
* class path if user wants cross-platform practice i.e. submit an application
* from a Windows client to a Linux/Unix server or vice versa.
*/
@Public
@Unstable
public static final String CLASS_PATH_SEPARATOR= "<CPS>";
/**
* The following two constants are used to expand parameter and it will be
* replaced with real parameter expansion marker ('%' for Windows and '$' for
* Linux) by NodeManager on container launch. For example: {{VAR}} will be
* replaced as $VAR on Linux, and %VAR% on Windows. User has to use this
* constant to construct class path if user wants cross-platform practice i.e.
* submit an application from a Windows client to a Linux/Unix server or vice
* versa.
*/
@Public
@Unstable
public static final String PARAMETER_EXPANSION_LEFT="{{";
/**
* User has to use this constant to construct class path if user wants
* cross-platform practice i.e. submit an application from a Windows client to
* a Linux/Unix server or vice versa.
*/
@Public
@Unstable
public static final String PARAMETER_EXPANSION_RIGHT="}}";
public static final String STDERR = "stderr";
public static final String STDOUT = "stdout";
@ -207,6 +241,14 @@ public String toString() {
return variable;
}
/**
* Expand the environment variable based on client OS environment variable
* expansion syntax (e.g. $VAR for Linux and %VAR% for Windows).
* <p>
* Note: Use $$() method for cross-platform practice i.e. submit an
* application from a Windows client to a Linux/Unix server or vice versa.
* </p>
*/
public String $() {
if (Shell.WINDOWS) {
return "%" + variable + "%";
@ -214,5 +256,18 @@ public String toString() {
return "$" + variable;
}
}
/**
* Expand the environment variable in platform-agnostic syntax. The
* parameter expansion marker "{{VAR}}" will be replaced with real parameter
* expansion marker ('%' for Windows and '$' for Linux) by NodeManager on
* container launch. For example: {{VAR}} will be replaced as $VAR on Linux,
* and %VAR% on Windows.
*/
@Public
@Unstable
public String $$() {
return PARAMETER_EXPANSION_LEFT + variable + PARAMETER_EXPANSION_RIGHT;
}
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
@ -955,8 +956,39 @@ public class YarnConfiguration extends Configuration {
+ "application.classpath";
/**
* Default CLASSPATH for YARN applications. A comma-separated list of
* CLASSPATH entries
* Default platform-agnostic CLASSPATH for YARN applications. A
* comma-separated list of CLASSPATH entries. The parameter expansion marker
* will be replaced with real parameter expansion marker ('%' for Windows and
* '$' for Linux) by NodeManager on container launch. For example: {{VAR}}
* will be replaced as $VAR on Linux, and %VAR% on Windows.
*/
@Public
@Unstable
public static final String[] DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH= {
ApplicationConstants.Environment.HADOOP_CONF_DIR.$$(),
ApplicationConstants.Environment.HADOOP_COMMON_HOME.$$()
+ "/share/hadoop/common/*",
ApplicationConstants.Environment.HADOOP_COMMON_HOME.$$()
+ "/share/hadoop/common/lib/*",
ApplicationConstants.Environment.HADOOP_HDFS_HOME.$$()
+ "/share/hadoop/hdfs/*",
ApplicationConstants.Environment.HADOOP_HDFS_HOME.$$()
+ "/share/hadoop/hdfs/lib/*",
ApplicationConstants.Environment.HADOOP_YARN_HOME.$$()
+ "/share/hadoop/yarn/*",
ApplicationConstants.Environment.HADOOP_YARN_HOME.$$()
+ "/share/hadoop/yarn/lib/*" };
/**
* <p>
* Default platform-specific CLASSPATH for YARN applications. A
* comma-separated list of CLASSPATH entries constructed based on the client
* OS environment expansion syntax.
* </p>
* <p>
* Note: Use {@link DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for
* cross-platform practice i.e. submit an application from a Windows client to
* a Linux/Unix server or vice versa.
* </p>
*/
public static final String[] DEFAULT_YARN_APPLICATION_CLASSPATH = {
ApplicationConstants.Environment.HADOOP_CONF_DIR.$(),

View File

@ -47,6 +47,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@ -74,7 +76,6 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -85,6 +86,7 @@
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@ -223,8 +225,9 @@ public class ApplicationMaster {
private long shellScriptPathLen = 0;
// Hardcoded path to shell script in launch container's local env
private static final String ExecShellStringPath = "ExecShellScript.sh";
private static final String ExecBatScripStringtPath = "ExecBatScript.bat";
private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
+ ".bat";
// Hardcoded path to custom log_properties
private static final String log4jPath = "log4j.properties";
@ -846,15 +849,29 @@ public void run() {
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!shellScriptPath.isEmpty()) {
Path renamedSchellScriptPath = null;
if (Shell.WINDOWS) {
renamedSchellScriptPath = new Path(shellScriptPath + ".bat");
} else {
renamedSchellScriptPath = new Path(shellScriptPath + ".sh");
}
try {
FileSystem fs = renamedSchellScriptPath.getFileSystem(conf);
fs.rename(new Path(shellScriptPath), renamedSchellScriptPath);
} catch (IOException e) {
LOG.warn("Not able to add suffix (.bat/.sh) to the shell script filename");
throw new YarnRuntimeException(e);
}
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
shellScriptPath)));
renamedSchellScriptPath.toString())));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + shellScriptPath);
+ " in env, path=" + renamedSchellScriptPath);
e.printStackTrace();
// A failure scenario on bad input such as invalid shell script path

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -49,7 +48,6 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@ -177,8 +175,7 @@ public class Client {
// Hardcoded path to custom log_properties
private static final String log4jPath = "log4j.properties";
private static final String linuxShellPath = "ExecShellScript.sh";
private static final String windowBatPath = "ExecBatScript.bat";
public static final String SCRIPT_PATH = "ExecScript";
/**
* @param args Command line arguments
@ -492,8 +489,7 @@ public boolean run() throws IOException, YarnException {
if (!shellScriptPath.isEmpty()) {
Path shellSrc = new Path(shellScriptPath);
String shellPathSuffix =
appName + "/" + appId.getId() + "/"
+ (Shell.WINDOWS ? windowBatPath : linuxShellPath);
appName + "/" + appId.getId() + "/" + SCRIPT_PATH;
Path shellDst =
new Path(fs.getHomeDirectory(), shellPathSuffix);
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
@ -535,15 +531,16 @@ public boolean run() throws IOException, YarnException {
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$())
.append(File.pathSeparatorChar).append("./*");
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
classPathEnv.append(File.pathSeparatorChar);
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
"./log4j.properties");
// add the runtime classpath needed for tests to work
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
@ -560,7 +557,7 @@ public boolean run() throws IOException, YarnException {
// Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -70,7 +71,7 @@ public static void throwParseException(String name, String s) {
}
public static void setEnvFromInputString(Map<String, String> env,
String envString) {
String envString, String classPathSeparator) {
if (envString != null && envString.length() > 0) {
String childEnvs[] = envString.split(",");
Pattern p = Pattern.compile(Shell.getEnvironmentVariableRegex());
@ -92,7 +93,7 @@ public static void setEnvFromInputString(Map<String, String> env,
m.appendReplacement(sb, Matcher.quoteReplacement(replace));
}
m.appendTail(sb);
addToEnvironment(env, parts[0], sb.toString());
addToEnvironment(env, parts[0], sb.toString(), classPathSeparator);
}
}
}
@ -101,14 +102,19 @@ public static void setEnvFromInputString(Map<String, String> env,
@Unstable
public static void addToEnvironment(
Map<String, String> environment,
String variable, String value) {
String variable, String value, String classPathSeparator) {
String val = environment.get(variable);
if (val == null) {
val = value;
} else {
val = val + File.pathSeparator + value;
val = val + classPathSeparator + value;
}
environment.put(StringInterner.weakIntern(variable),
StringInterner.weakIntern(val));
}
public static String crossPlatformify(String var) {
return ApplicationConstants.PARAMETER_EXPANSION_LEFT + var
+ ApplicationConstants.PARAMETER_EXPANSION_RIGHT;
}
}

View File

@ -76,6 +76,8 @@
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
public class ContainerLaunch implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
@ -124,6 +126,25 @@ public ContainerLaunch(Context context, Configuration configuration,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
}
@VisibleForTesting
public static String expandEnvironment(String var,
Path containerLogDir) {
var = var.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toString());
var = var.replace(ApplicationConstants.CLASS_PATH_SEPARATOR,
File.pathSeparator);
// replace parameter expansion marker. e.g. {{VAR}} on Windows is replaced
// as %VAR% and on Linux replaced as "$VAR"
if (Shell.WINDOWS) {
var = var.replaceAll("(\\{\\{)|(\\}\\})", "%");
} else {
var = var.replace(ApplicationConstants.PARAMETER_EXPANSION_LEFT, "$");
var = var.replace(ApplicationConstants.PARAMETER_EXPANSION_RIGHT, "");
}
return var;
}
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() {
@ -165,8 +186,7 @@ public Integer call() {
dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toString()));
newCmds.add(expandEnvironment(str, containerLogDir));
}
launchContext.setCommands(newCmds);
@ -174,11 +194,8 @@ public Integer call() {
// Make a copy of env to iterate & do variable expansion
for (Entry<String, String> entry : environment.entrySet()) {
String value = entry.getValue();
entry.setValue(
value.replace(
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toString())
);
value = expandEnvironment(value, containerLogDir);
entry.setValue(value);
}
// /////////////////////////// End of variable expansion
@ -647,12 +664,9 @@ public void sanitizeEnv(Map<String, String> environment, Path pwd,
}
// variables here will be forced in, even if the container has specified them.
Apps.setEnvFromInputString(
environment,
conf.get(
Apps.setEnvFromInputString(environment, conf.get(
YarnConfiguration.NM_ADMIN_USER_ENV,
YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV)
);
YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV), File.pathSeparator);
// TODO: Remove Windows check and use this approach on all platforms after
// additional testing. See YARN-358.

View File

@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import java.io.BufferedReader;
import java.io.File;
@ -48,6 +47,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -73,12 +73,12 @@
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
@ -287,6 +287,31 @@ public void testInvalidEnvSyntaxDiagnostics() throws IOException {
}
}
@Test(timeout = 10000)
public void testEnvExpansion() throws IOException {
Path logPath = new Path("/nm/container/logs");
String input =
Apps.crossPlatformify("HADOOP_HOME") + "/share/hadoop/common/*"
+ ApplicationConstants.CLASS_PATH_SEPARATOR
+ Apps.crossPlatformify("HADOOP_HOME") + "/share/hadoop/common/lib/*"
+ ApplicationConstants.CLASS_PATH_SEPARATOR
+ Apps.crossPlatformify("HADOOP_LOG_HOME")
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR;
String res = ContainerLaunch.expandEnvironment(input, logPath);
if (Shell.WINDOWS) {
Assert.assertEquals("%HADOOP_HOME%/share/hadoop/common/*;"
+ "%HADOOP_HOME%/share/hadoop/common/lib/*;"
+ "%HADOOP_LOG_HOME%/nm/container/logs", res);
} else {
Assert.assertEquals("$HADOOP_HOME/share/hadoop/common/*:"
+ "$HADOOP_HOME/share/hadoop/common/lib/*:"
+ "$HADOOP_LOG_HOME/nm/container/logs", res);
}
System.out.println(res);
}
@Test (timeout = 20000)
public void testContainerLaunchStdoutAndStderrDiagnostics() throws IOException {