diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8efd232b6c3..cc1adb8086d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -292,6 +292,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-901. Efficient framework counters. (llu via acmurthy) + MAPREDUCE-2880. Improve classpath-construction for mapreduce AM and + containers. (Arun C Murthy via vinodkv) + BUG FIXES MAPREDUCE-2603. Disable High-Ram emulation in system tests. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java index d9d5b1f3076..fc25d06da24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java @@ -18,27 +18,25 @@ package org.apache.hadoop.mapred; -import java.io.File; import java.net.InetSocketAddress; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.TaskLog.LogName; import org.apache.hadoop.mapreduce.ID; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; public class MapReduceChildJVM { - private static final String SYSTEM_PATH_SEPARATOR = - System.getProperty("path.separator"); - private static final Log LOG = LogFactory.getLog(MapReduceChildJVM.class); - - private static File getTaskLogFile(String logDir, LogName filter) { - return new File(logDir, filter.toString()); + private static String getTaskLogFile(LogName filter) { + return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + + filter.toString(); } private static String getChildEnv(JobConf jobConf, boolean isMap) { @@ -50,32 +48,53 @@ private static String getChildEnv(JobConf jobConf, boolean isMap) { jobConf.get(jobConf.MAPRED_TASK_ENV)); } - public static void setVMEnv(Map env, - List classPaths, String pwd, String containerLogDir, - String nmLdLibraryPath, Task task, CharSequence applicationTokensFile) { + private static String getChildLogLevel(JobConf conf, boolean isMap) { + if (isMap) { + return conf.get( + MRJobConfig.MAP_LOG_LEVEL, + JobConf.DEFAULT_LOG_LEVEL.toString() + ); + } else { + return conf.get( + MRJobConfig.REDUCE_LOG_LEVEL, + JobConf.DEFAULT_LOG_LEVEL.toString() + ); + } + } + + public static void setVMEnv(Map environment, + Task task) { JobConf conf = task.conf; - // Add classpath. - CharSequence cp = env.get("CLASSPATH"); - String classpath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths); - if (null == cp) { - env.put("CLASSPATH", classpath); - } else { - env.put("CLASSPATH", classpath + SYSTEM_PATH_SEPARATOR + cp); - } + // Shell + environment.put( + Environment.SHELL.name(), + conf.get( + MRJobConfig.MAPRED_ADMIN_USER_SHELL, + MRJobConfig.DEFAULT_SHELL) + ); + + // Add pwd to LD_LIBRARY_PATH, add this before adding anything else + MRApps.addToEnvironment( + environment, + Environment.LD_LIBRARY_PATH.name(), + Environment.PWD.$()); - /////// Environmental variable LD_LIBRARY_PATH - StringBuilder ldLibraryPath = new StringBuilder(); + // Add the env variables passed by the user & admin + String mapredChildEnv = getChildEnv(conf, task.isMapTask()); + MRApps.setEnvFromInputString(environment, mapredChildEnv); + MRApps.setEnvFromInputString( + environment, + conf.get( + MRJobConfig.MAPRED_ADMIN_USER_ENV, + MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV) + ); - ldLibraryPath.append(nmLdLibraryPath); - ldLibraryPath.append(SYSTEM_PATH_SEPARATOR); - ldLibraryPath.append(pwd); - env.put("LD_LIBRARY_PATH", ldLibraryPath.toString()); - /////// Environmental variable LD_LIBRARY_PATH - - // for the child of task jvm, set hadoop.root.logger - env.put("HADOOP_ROOT_LOGGER", "DEBUG,CLA"); // TODO: Debug + // Set logging level + environment.put( + "HADOOP_ROOT_LOGGER", + getChildLogLevel(conf, task.isMapTask()) + ",CLA"); // TODO: The following is useful for instance in streaming tasks. Should be // set in ApplicationMaster's env by the RM. @@ -89,76 +108,69 @@ public static void setVMEnv(Map env, // properties. long logSize = TaskLog.getTaskLogLength(conf); Vector logProps = new Vector(4); - setupLog4jProperties(logProps, logSize, containerLogDir); + setupLog4jProperties(logProps, logSize); Iterator it = logProps.iterator(); StringBuffer buffer = new StringBuffer(); while (it.hasNext()) { buffer.append(" " + it.next()); } hadoopClientOpts = hadoopClientOpts + buffer.toString(); - - env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); + environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); - // add the env variables passed by the user - String mapredChildEnv = getChildEnv(conf, task.isMapTask()); - if (mapredChildEnv != null && mapredChildEnv.length() > 0) { - String childEnvs[] = mapredChildEnv.split(","); - for (String cEnv : childEnvs) { - String[] parts = cEnv.split("="); // split on '=' - String value = (String) env.get(parts[0]); - if (value != null) { - // replace $env with the child's env constructed by tt's - // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp - value = parts[1].replace("$" + parts[0], value); - } else { - // this key is not configured by the tt for the child .. get it - // from the tt's env - // example PATH=$PATH:/tmp - value = System.getenv(parts[0]); // Get from NM? - if (value != null) { - // the env key is present in the tt's env - value = parts[1].replace("$" + parts[0], value); - } else { - // the env key is note present anywhere .. simply set it - // example X=$X:/tmp or X=/tmp - value = parts[1].replace("$" + parts[0], ""); - } - } - env.put(parts[0], value); - } - } - - //This should not be set here (If an OS check is requied. moved to ContainerLuanch) - // env.put("JVM_PID", "`echo $$`"); - - env.put(Constants.STDOUT_LOGFILE_ENV, - getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString()); - env.put(Constants.STDERR_LOGFILE_ENV, - getTaskLogFile(containerLogDir, TaskLog.LogName.STDERR).toString()); + // Add stdout/stderr env + environment.put( + MRJobConfig.STDOUT_LOGFILE_ENV, + getTaskLogFile(TaskLog.LogName.STDOUT) + ); + environment.put( + MRJobConfig.STDERR_LOGFILE_ENV, + getTaskLogFile(TaskLog.LogName.STDERR) + ); } private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) { + String userClasspath = ""; + String adminClasspath = ""; if (isMapTask) { - return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get( - JobConf.MAPRED_TASK_JAVA_OPTS, - JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); + userClasspath = + jobConf.get( + JobConf.MAPRED_MAP_TASK_JAVA_OPTS, + jobConf.get( + JobConf.MAPRED_TASK_JAVA_OPTS, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS) + ); + adminClasspath = + jobConf.get( + MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, + MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); + } else { + userClasspath = + jobConf.get( + JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, + jobConf.get( + JobConf.MAPRED_TASK_JAVA_OPTS, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS) + ); + adminClasspath = + jobConf.get( + MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, + MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); } - return jobConf - .get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, jobConf.get( - JobConf.MAPRED_TASK_JAVA_OPTS, - JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); + + // Add admin classpath first so it can be overridden by user. + return adminClasspath + " " + userClasspath; } private static void setupLog4jProperties(Vector vargs, - long logSize, String containerLogDir) { + long logSize) { vargs.add("-Dlog4j.configuration=container-log4j.properties"); - vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + containerLogDir); + vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); vargs.add("-Dhadoop.yarn.mr.totalLogFileSize=" + logSize); } public static List getVMCommand( - InetSocketAddress taskAttemptListenerAddr, Task task, String javaHome, - String workDir, String logDir, String childTmpDir, ID jvmID) { + InetSocketAddress taskAttemptListenerAddr, Task task, + ID jvmID) { TaskAttemptID attemptID = task.getTaskID(); JobConf conf = task.conf; @@ -166,7 +178,7 @@ public static List getVMCommand( Vector vargs = new Vector(8); vargs.add("exec"); - vargs.add(javaHome + "/bin/java"); + vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); // Add child (task) java-vm options. // @@ -199,44 +211,26 @@ public static List getVMCommand( String javaOpts = getChildJavaOpts(conf, task.isMapTask()); javaOpts = javaOpts.replace("@taskid@", attemptID.toString()); String [] javaOptsSplit = javaOpts.split(" "); - - // Add java.library.path; necessary for loading native libraries. - // - // 1. We add the 'cwd' of the task to it's java.library.path to help - // users distribute native libraries via the DistributedCache. - // 2. The user can also specify extra paths to be added to the - // java.library.path via mapred.{map|reduce}.child.java.opts. - // - String libraryPath = workDir; - boolean hasUserLDPath = false; - for(int i=0; i" + getTaskLogFile(logDir, TaskLog.LogName.STDERR)); - vargs.add("2>" + getTaskLogFile(logDir, TaskLog.LogName.STDOUT)); + vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDERR)); + vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDOUT)); // Final commmand StringBuilder mergedCommand = new StringBuilder(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 3021004f9dd..0ab220bf383 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.Credentials; @@ -71,7 +70,7 @@ public static void main(String[] args) throws Throwable { LOG.debug("Child starting"); final JobConf defaultConf = new JobConf(); - defaultConf.addResource(MRConstants.JOB_CONF_FILE); + defaultConf.addResource(MRJobConfig.JOB_CONF_FILE); UserGroupInformation.setConfiguration(defaultConf); String host = args[0]; @@ -238,7 +237,7 @@ private static void configureLocalDirs(Task task, JobConf job) { private static JobConf configureTask(Task task, Credentials credentials, Token jt) throws IOException { - final JobConf job = new JobConf(MRConstants.JOB_CONF_FILE); + final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); job.setCredentials(credentials); // set tcp nodelay job.setBoolean("ipc.client.tcpnodelay", true); @@ -260,7 +259,7 @@ private static JobConf configureTask(Task task, Credentials credentials, // Overwrite the localized task jobconf which is linked to in the current // work-dir. - Path localTaskFile = new Path(Constants.JOBFILE); + Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE); writeLocalJobFile(localTaskFile, job); task.setJobFile(localTaskFile.toString()); task.setConf(job); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 20c7e9779e8..41a86f1271f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -243,10 +242,10 @@ protected Job createJob(Configuration conf) { // Read the file-system tokens from the localized tokens-file. Path jobSubmitDir = FileContext.getLocalFSFileContext().makeQualified( - new Path(new File(MRConstants.JOB_SUBMIT_DIR) + new Path(new File(MRJobConfig.JOB_SUBMIT_DIR) .getAbsolutePath())); Path jobTokenFile = - new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE); + new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf)); LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile=" + jobTokenFile); @@ -658,7 +657,7 @@ public static void main(String[] args) { Runtime.getRuntime().addShutdownHook( new CompositeServiceShutdownHook(appMaster)); YarnConfiguration conf = new YarnConfiguration(new JobConf()); - conf.addResource(new Path(MRConstants.JOB_CONF_FILE)); + conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); UserGroupInformation.setConfiguration(conf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 69de493b16b..e822cab80d2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceChildJVM; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -64,7 +65,6 @@ import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; import org.apache.hadoop.mapreduce.v2.api.records.Counters; @@ -1007,7 +1007,7 @@ protected void setup(JobImpl job) throws IOException { FileSystem.get(job.conf).makeQualified( new Path(path, oldJobIDString)); job.remoteJobConfFile = - new Path(job.remoteJobSubmitDir, MRConstants.JOB_CONF_FILE); + new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); // Prepare the TaskAttemptListener server for authentication of Containers // TaskAttemptListener gets the information via jobTokenSecretManager. @@ -1033,7 +1033,7 @@ protected void setup(JobImpl job) throws IOException { Path remoteJobTokenFile = new Path(job.remoteJobSubmitDir, - MRConstants.APPLICATION_TOKENS_FILE); + MRJobConfig.APPLICATION_TOKENS_FILE); tokenStorage.writeTokenStorageFile(remoteJobTokenFile, job.conf); LOG.info("Writing back the job-token file on the remote file system:" + remoteJobTokenFile.toString()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 95c344bfbea..495d00e22c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -62,7 +61,6 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.Phase; @@ -103,6 +101,7 @@ import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerToken; @@ -117,7 +116,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; -import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.RackResolver; @@ -518,8 +516,8 @@ private String getInitialClasspath() throws IOException { return initialClasspath; } Map env = new HashMap(); - MRApps.setInitialClasspath(env); - initialClasspath = env.get(MRApps.CLASSPATH); + MRApps.setClasspath(env); + initialClasspath = env.get(Environment.CLASSPATH.name()); initialClasspathFlag.set(true); return initialClasspath; } @@ -531,16 +529,18 @@ private String getInitialClasspath() throws IOException { */ private ContainerLaunchContext createContainerLaunchContext() { - ContainerLaunchContext container = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - // Application resources Map localResources = new HashMap(); // Application environment Map environment = new HashMap(); - + + // Service data + Map serviceData = new HashMap(); + + // Tokens + ByteBuffer tokens = ByteBuffer.wrap(new byte[]{}); try { FileSystem remoteFS = FileSystem.get(conf); @@ -550,7 +550,7 @@ private ContainerLaunchContext createContainerLaunchContext() { MRJobConfig.JAR))).makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); localResources.put( - MRConstants.JOB_JAR, + MRJobConfig.JOB_JAR, createLocalResource(remoteFS, recordFactory, remoteJobJar, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-jar file on the remote FS is " @@ -570,9 +570,9 @@ private ContainerLaunchContext createContainerLaunchContext() { Path remoteJobSubmitDir = new Path(path, oldJobId.toString()); Path remoteJobConfPath = - new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE); + new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); localResources.put( - MRConstants.JOB_CONF_FILE, + MRJobConfig.JOB_CONF_FILE, createLocalResource(remoteFS, recordFactory, remoteJobConfPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-conf file on the remote FS is " @@ -580,12 +580,8 @@ private ContainerLaunchContext createContainerLaunchContext() { // //////////// End of JobConf setup // Setup DistributedCache - MRApps.setupDistributedCache(conf, localResources, environment); + MRApps.setupDistributedCache(conf, localResources); - // Set local-resources and environment - container.setLocalResources(localResources); - container.setEnvironment(environment); - // Setup up tokens Credentials taskCredentials = new Credentials(); @@ -606,52 +602,43 @@ private ContainerLaunchContext createContainerLaunchContext() { LOG.info("Size of containertokens_dob is " + taskCredentials.numberOfTokens()); taskCredentials.writeTokenStorageToStream(containerTokens_dob); - container.setContainerTokens( + tokens = ByteBuffer.wrap(containerTokens_dob.getData(), 0, - containerTokens_dob.getLength())); + containerTokens_dob.getLength()); // Add shuffle token LOG.info("Putting shuffle token in serviceData"); - Map serviceData = new HashMap(); serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, ShuffleHandler.serializeServiceData(jobToken)); - container.setServiceData(serviceData); - MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath()); + MRApps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + getInitialClasspath()); } catch (IOException e) { throw new YarnException(e); } + + // Setup environment + MapReduceChildJVM.setVMEnv(environment, remoteTask); + + // Set up the launch command + List commands = MapReduceChildJVM.getVMCommand( + taskAttemptListener.getAddress(), remoteTask, + jvmID); - container.setContainerId(containerID); - container.setUser(conf.get(MRJobConfig.USER_NAME)); // TODO: Fix - - File workDir = new File("$PWD"); // Will be expanded by the shell. - String containerLogDir = - new File(ApplicationConstants.LOG_DIR_EXPANSION_VAR).toString(); - String childTmpDir = new File(workDir, "tmp").toString(); - String javaHome = "${JAVA_HOME}"; // Will be expanded by the shell. - String nmLdLibraryPath = "{LD_LIBRARY_PATH}"; // Expanded by the shell? - List classPaths = new ArrayList(); - - String localizedApplicationTokensFile = - new File(workDir, MRConstants.APPLICATION_TOKENS_FILE).toString(); - classPaths.add(MRConstants.JOB_JAR); - classPaths.add(MRConstants.YARN_MAPREDUCE_APP_JAR_PATH); - classPaths.add(workDir.toString()); // TODO - - // Construct the actual Container - container.setCommands(MapReduceChildJVM.getVMCommand( - taskAttemptListener.getAddress(), remoteTask, javaHome, - workDir.toString(), containerLogDir, childTmpDir, jvmID)); - - MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths, - workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask, - localizedApplicationTokensFile); - // Construct the actual Container + ContainerLaunchContext container = + recordFactory.newRecordInstance(ContainerLaunchContext.class); container.setContainerId(containerID); container.setUser(conf.get(MRJobConfig.USER_NAME)); container.setResource(assignedCapability); + container.setLocalResources(localResources); + container.setEnvironment(environment); + container.setCommands(commands); + container.setServiceData(serviceData); + container.setContainerTokens(tokens); + return container; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java index feb019fe162..ab7d23ef9dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java @@ -35,7 +35,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; @@ -87,7 +86,7 @@ public class DefaultSpeculator extends AbstractService implements private final ConcurrentMap reduceContainerNeeds = new ConcurrentHashMap(); - private final Set mayHaveSpeculated = new HashSet(); + private final Set mayHaveSpeculated = new HashSet(); private final Configuration conf; private AppContext context; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/MRConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/MRConstants.java deleted file mode 100644 index af7194620de..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/MRConstants.java +++ /dev/null @@ -1,50 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.mapreduce.v2; - - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface MRConstants { - // This should be the directory where splits file gets localized on the node - // running ApplicationMaster. - public static final String JOB_SUBMIT_DIR = "jobSubmitDir"; - - // This should be the name of the localized job-configuration file on the node - // running ApplicationMaster and Task - public static final String JOB_CONF_FILE = "job.xml"; - // This should be the name of the localized job-jar file on the node running - // individual containers/tasks. - public static final String JOB_JAR = "job.jar"; - - public static final String HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME = - "hadoop-mapreduce-client-app-0.23.0-SNAPSHOT.jar"; - - public static final String YARN_MAPREDUCE_APP_JAR_PATH = - "$YARN_HOME/modules/" + HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME; - - // The token file for the application. Should contain tokens for access to - // remote file system and may optionally contain application specific tokens. - // For now, generated by the AppManagers and used by NodeManagers and the - // Containers. - public static final String APPLICATION_TOKENS_FILE = "appTokens"; -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 68499497ac3..9094da39ba3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -39,14 +39,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -167,7 +167,7 @@ public static TaskAttemptStateUI taskAttemptState(String attemptStateStr) { return TaskAttemptStateUI.valueOf(attemptStateStr); } - public static void setInitialClasspath( + private static void setMRFrameworkClasspath( Map environment) throws IOException { InputStream classpathFileStream = null; BufferedReader reader = null; @@ -182,30 +182,17 @@ public static void setInitialClasspath( reader = new BufferedReader(new InputStreamReader(classpathFileStream)); String cp = reader.readLine(); if (cp != null) { - addToClassPath(environment, cp.trim()); + addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim()); } // Put the file itself on classpath for tasks. - addToClassPath(environment, + addToEnvironment( + environment, + Environment.CLASSPATH.name(), thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile()); - // If runtime env is different. - if (System.getenv().get("YARN_HOME") != null) { - ShellCommandExecutor exec = - new ShellCommandExecutor(new String[] { - System.getenv().get("YARN_HOME") + "/bin/yarn", - "classpath" }); - exec.execute(); - addToClassPath(environment, exec.getOutput().trim()); - } - - // Get yarn mapreduce-app classpath - if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) { - ShellCommandExecutor exec = - new ShellCommandExecutor(new String[] { - System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred", - "classpath" }); - exec.execute(); - addToClassPath(environment, exec.getOutput().trim()); + // Add standard Hadoop classes + for (String c : ApplicationConstants.APPLICATION_CLASSPATH) { + addToEnvironment(environment, Environment.CLASSPATH.name(), c); } } finally { if (classpathFileStream != null) { @@ -217,20 +204,35 @@ public static void setInitialClasspath( } // TODO: Remove duplicates. } + + private static final String SYSTEM_PATH_SEPARATOR = + System.getProperty("path.separator"); - public static void addToClassPath( - Map environment, String fileName) { - String classpath = environment.get(CLASSPATH); - if (classpath == null) { - classpath = fileName; + public static void addToEnvironment( + Map environment, + String variable, String value) { + String val = environment.get(variable); + if (val == null) { + val = value; } else { - classpath = classpath + ":" + fileName; + val = val + SYSTEM_PATH_SEPARATOR + value; } - environment.put(CLASSPATH, classpath); + environment.put(variable, val); } - public static final String CLASSPATH = "CLASSPATH"; - + public static void setClasspath(Map environment) + throws IOException { + MRApps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + MRJobConfig.JOB_JAR); + MRApps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + Environment.PWD.$() + Path.SEPARATOR + "*"); + MRApps.setMRFrameworkClasspath(environment); + } + private static final String STAGING_CONSTANT = ".staging"; public static Path getStagingAreaDir(Configuration conf, String user) { return new Path( @@ -241,7 +243,7 @@ public static Path getStagingAreaDir(Configuration conf, String user) { public static String getJobFile(Configuration conf, String user, org.apache.hadoop.mapreduce.JobID jobId) { Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user), - jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE); + jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); return jobFile.toString(); } @@ -260,12 +262,11 @@ private static long[] parseTimeStamps(String[] strs) { public static void setupDistributedCache( Configuration conf, - Map localResources, - Map env) + Map localResources) throws IOException { // Cache archives - parseDistributedCacheArtifacts(conf, localResources, env, + parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)), @@ -275,7 +276,7 @@ public static void setupDistributedCache( // Cache files parseDistributedCacheArtifacts(conf, - localResources, env, + localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), parseTimeStamps(DistributedCache.getFileTimestamps(conf)), @@ -290,7 +291,6 @@ public static void setupDistributedCache( private static void parseDistributedCacheArtifacts( Configuration conf, Map localResources, - Map env, LocalResourceType type, URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], Path[] pathsToPutOnClasspath) throws IOException { @@ -339,9 +339,6 @@ private static void parseDistributedCacheArtifacts( : LocalResourceVisibility.PRIVATE, sizes[i], timestamps[i]) ); - if (classPaths.containsKey(u.getPath())) { - MRApps.addToClassPath(env, linkName); - } } } } @@ -358,6 +355,42 @@ private static long[] getFileSizes(Configuration conf, String key) { } return result; } + + public static void setEnvFromInputString(Map env, + String envString) { + if (envString != null && envString.length() > 0) { + String childEnvs[] = envString.split(","); + for (String cEnv : childEnvs) { + String[] parts = cEnv.split("="); // split on '=' + String value = env.get(parts[0]); + + if (value != null) { + // Replace $env with the child's env constructed by NM's + // For example: LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp + value = parts[1].replace("$" + parts[0], value); + } else { + // example PATH=$PATH:/tmp + value = System.getenv(parts[0]); + if (value != null) { + // the env key is present in the tt's env + value = parts[1].replace("$" + parts[0], value); + } else { + // check for simple variable substitution + // for e.g. ROOT=$HOME + String envValue = System.getenv(parts[1].substring(1)); + if (envValue != null) { + value = envValue; + } else { + // the env key is note present anywhere .. simply set it + // example X=$X:/tmp or X=/tmp + value = parts[1].replace("$" + parts[0], ""); + } + } + } + addToEnvironment(env, parts[0], value); + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 7a2ee00a92d..11589980625 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -115,7 +114,8 @@ public class TestMRApps { @Test public void testGetJobFileWithUser() { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging"); - String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345)); + String jobFile = MRApps.getJobFile(conf, "dummy-user", + new JobID("dummy-job", 12345)); assertNotNull("getJobFile results in null.", jobFile); assertEquals("jobFile with specified user is not as expected.", "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java index 026793c5374..f409d2298eb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapred.Merger.Segment; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; /** @@ -560,7 +561,7 @@ void createInDiskSegment() throws IOException { private Writer createSpillFile() throws IOException { Path tmp = - new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_" + new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); LOG.info("Created file: " + tmp); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java deleted file mode 100644 index e8a202ed44b..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -public class Constants { - static final String OUTPUT = "output"; - public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR"; - public static final String JOBFILE = "job.xml"; - public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV"; - public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV"; -} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java index 49d12d764d5..b489d41b17c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java @@ -348,6 +348,7 @@ public class JobConf extends Configuration { */ public static final Level DEFAULT_LOG_LEVEL = Level.INFO; + /** * Construct a map/reduce job configuration. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java index e2c16fbfac1..806b8566521 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.mapred; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + /******************************* * Some handy constants * *******************************/ -interface MRConstants { +@Private +@Unstable +public class MRConstants { // // Timeouts, constants // @@ -52,6 +57,4 @@ interface MRConstants { * The reduce task number for which this map output is being transferred */ public static final String FOR_REDUCE_TASK = "for-reduce-task"; - - public static final String WORKDIR = "work"; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java index e81e11d3fb6..a9e25f287d4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; /** * Manipulate the working area for the transient store for maps and reduces. @@ -54,7 +55,7 @@ public MROutputFiles() { @Override public Path getOutputFile() throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR + return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR + MAP_OUTPUT_FILENAME_STRING, getConf()); } @@ -68,7 +69,7 @@ public Path getOutputFile() @Override public Path getOutputFileForWrite(long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR + return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR + MAP_OUTPUT_FILENAME_STRING, size, getConf()); } @@ -89,7 +90,7 @@ public Path getOutputFileForWriteInVolume(Path existing) { @Override public Path getOutputIndexFile() throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR + return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, getConf()); } @@ -104,7 +105,7 @@ public Path getOutputIndexFile() @Override public Path getOutputIndexFileForWrite(long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR + return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR + MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING, size, getConf()); } @@ -128,7 +129,7 @@ public Path getOutputIndexFileForWriteInVolume(Path existing) { @Override public Path getSpillFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill" + return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill" + spillNumber + ".out", getConf()); } @@ -143,7 +144,7 @@ public Path getSpillFile(int spillNumber) @Override public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill" + return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill" + spillNumber + ".out", size, getConf()); } @@ -157,7 +158,7 @@ public Path getSpillFileForWrite(int spillNumber, long size) @Override public Path getSpillIndexFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill" + return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill" + spillNumber + ".out.index", getConf()); } @@ -172,7 +173,7 @@ public Path getSpillIndexFile(int spillNumber) @Override public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill" + return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill" + spillNumber + ".out.index", size, getConf()); } @@ -187,7 +188,7 @@ public Path getSpillIndexFileForWrite(int spillNumber, long size) public Path getInputFile(int mapId) throws IOException { return lDirAlloc.getLocalPathToRead(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, Integer + REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, Integer .valueOf(mapId)), getConf()); } @@ -204,7 +205,7 @@ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, long size) throws IOException { return lDirAlloc.getLocalPathForWrite(String.format( - REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, mapId.getId()), + REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()), size, getConf()); } @@ -212,7 +213,7 @@ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, @Override public void removeAll() throws IOException { - ((JobConf)getConf()).deleteLocalFiles(Constants.OUTPUT); + ((JobConf)getConf()).deleteLocalFiles(MRJobConfig.OUTPUT); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 33884bb82e9..a493ed1cf7c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -210,6 +210,8 @@ public interface MRJobConfig { public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level"; + public static final String DEFAULT_LOG_LEVEL = "INFO"; + public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold"; public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent"; @@ -400,4 +402,64 @@ public interface MRJobConfig { */ public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR = MR_AM_PREFIX + "create-intermediate-jh-base-dir"; + + public static final String MAPRED_MAP_ADMIN_JAVA_OPTS = + "mapreduce.admin.map.child.java.opts"; + + public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS = + "mapreduce.admin.reduce.child.java.opts"; + + public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS = + "-Djava.net.preferIPv4Stack=true " + + "-Dhadoop.metrics.log.level=WARN "; + + public static final String MAPRED_ADMIN_USER_SHELL = + "mapreduce.admin.user.shell"; + + public static final String DEFAULT_SHELL = "/bin/bash"; + + public static final String MAPRED_ADMIN_USER_ENV = + "mapreduce.admin.user.env"; + + public static final String DEFAULT_MAPRED_ADMIN_USER_ENV = + "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib"; + + public static final String WORKDIR = "work"; + + public static final String OUTPUT = "output"; + + public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR"; + + public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV"; + + public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV"; + + // This should be the directory where splits file gets localized on the node + // running ApplicationMaster. + public static final String JOB_SUBMIT_DIR = "jobSubmitDir"; + + // This should be the name of the localized job-configuration file on the node + // running ApplicationMaster and Task + public static final String JOB_CONF_FILE = "job.xml"; + + // This should be the name of the localized job-jar file on the node running + // individual containers/tasks. + public static final String JOB_JAR = "job.jar"; + + public static final String JOB_SPLIT = "job.split"; + + public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo"; + + public static final String APPLICATION_MASTER_CLASS = + "org.apache.hadoop.mapreduce.v2.app.MRAppMaster"; + + // The token file for the application. Should contain tokens for access to + // remote file system and may optionally contain application specific tokens. + // For now, generated by the AppManagers and used by NodeManagers and the + // Containers. + public static final String APPLICATION_TOKENS_FILE = "appTokens"; + + public static final String MAPREDUCE_V2_CHILD_CLASS = + "org.apache.hadoop.mapred.YarnChild"; + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 8e8081abe4d..be5b8621006 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -32,12 +32,12 @@ import org.apache.hadoop.mapreduce.ClusterMetrics; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.QueueAclsInfo; import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityInfo; @@ -254,7 +254,7 @@ public String getStagingAreaDir() throws IOException, InterruptedException { public String getSystemDir() throws IOException, InterruptedException { - Path sysDir = new Path(MRConstants.JOB_SUBMIT_DIR); + Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR); //FileContext.getFileContext(conf).delete(sysDir, true); return sysDir.toString(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 82134c7520f..a8853b928ee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -51,7 +51,6 @@ import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.Credentials; @@ -60,6 +59,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationState; @@ -210,7 +210,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) // Upload only in security mode: TODO Path applicationTokensFile = - new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE); + new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); try { ts.writeTokenStorageFile(applicationTokensFile, conf); } catch (IOException e) { @@ -226,7 +226,9 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); - String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); + String diagnostics = + (appMaster == null ? + "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getState() == ApplicationState.FAILED || appMaster.getState() == ApplicationState.KILLED) { throw new IOException("Failed to run job : " + @@ -263,7 +265,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( Map localResources = new HashMap(); - Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE); + Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE); URL yarnUrlForJobSubmitDir = ConverterUtils .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem() @@ -272,13 +274,13 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( LOG.debug("Creating setup context, jobSubmitDir url is " + yarnUrlForJobSubmitDir); - localResources.put(MRConstants.JOB_CONF_FILE, + localResources.put(MRJobConfig.JOB_CONF_FILE, createApplicationResource(defaultFileContext, jobConfPath)); if (jobConf.get(MRJobConfig.JAR) != null) { - localResources.put(MRConstants.JOB_JAR, + localResources.put(MRJobConfig.JOB_JAR, createApplicationResource(defaultFileContext, - new Path(jobSubmitDir, MRConstants.JOB_JAR))); + new Path(jobSubmitDir, MRJobConfig.JOB_JAR))); } else { // Job jar may be null. For e.g, for pipes, the job jar is the hadoop // mapreduce jar itself which is already on the classpath. @@ -287,10 +289,12 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( } // TODO gross hack - for (String s : new String[] { "job.split", "job.splitmetainfo", - MRConstants.APPLICATION_TOKENS_FILE }) { + for (String s : new String[] { + MRJobConfig.JOB_SPLIT, + MRJobConfig.JOB_SPLIT_METAINFO, + MRJobConfig.APPLICATION_TOKENS_FILE }) { localResources.put( - MRConstants.JOB_SUBMIT_DIR + "/" + s, + MRJobConfig.JOB_SUBMIT_DIR + "/" + s, createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s))); } @@ -304,9 +308,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( } // Setup the command to run the AM - String javaHome = "$JAVA_HOME"; Vector vargs = new Vector(8); - vargs.add(javaHome + "/bin/java"); + vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); vargs.add("-Dhadoop.root.logger=" + conf.get(MRJobConfig.MR_AM_LOG_OPTS, MRJobConfig.DEFAULT_MR_AM_LOG_OPTS) + ",console"); @@ -314,12 +317,15 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS)); - vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster"); + vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); vargs.add(String.valueOf(applicationId.getClusterTimestamp())); vargs.add(String.valueOf(applicationId.getId())); vargs.add(ApplicationConstants.AM_FAIL_COUNT_STRING); - vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); - vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDERR); + Vector vargsFinal = new Vector(8); // Final commmand @@ -332,15 +338,13 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( LOG.info("Command to launch container for ApplicationMaster is : " + mergedCommand); - // Setup the environment - Add { job jar, MR app jar } to classpath. + // Setup the CLASSPATH in environment + // i.e. add { job jar, CWD, Hadoop jars} to classpath. Map environment = new HashMap(); - MRApps.setInitialClasspath(environment); - MRApps.addToClassPath(environment, MRConstants.JOB_JAR); - MRApps.addToClassPath(environment, - MRConstants.YARN_MAPREDUCE_APP_JAR_PATH); - + MRApps.setClasspath(environment); + // Parse distributed cache - MRApps.setupDistributedCache(jobConf, localResources, environment); + MRApps.setupDistributedCache(jobConf, localResources); // Setup ContainerLaunchContext for AM container ContainerLaunchContext amContainer = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index fcb2a79fafb..a99ae231b85 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -43,9 +43,15 @@ */ public class MiniMRYarnCluster extends MiniYARNCluster { + public static final String HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME = + "hadoop-mapreduce-client-app-0.23.0-SNAPSHOT.jar"; + + public static final String YARN_MAPREDUCE_APP_JAR_PATH = + "$YARN_HOME/modules/" + HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME; + public static final String APPJAR = "../hadoop-mapreduce-client-app/target/" - + MRConstants.HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME; + + HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME; private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class); private JobHistoryServer historyServer; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 0a1943c013b..aa832aa1cc2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -402,7 +402,7 @@ public void setup(Context context) throws IOException { // both should be reachable via the class loader. Assert.assertNotNull(cl.getResource("distributed.jar.inside2")); Assert.assertNotNull(cl.getResource("distributed.jar.inside3")); - Assert.assertNull(cl.getResource("distributed.jar.inside4")); + Assert.assertNotNull(cl.getResource("distributed.jar.inside4")); // Check that the symlink for the renaming was created in the cwd; File symlinkFile = new File("distributed.first.symlink"); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 212ca671c89..591035b046e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -46,4 +46,117 @@ public interface ApplicationConstants { public static final String LOCAL_DIR_ENV = "YARN_LOCAL_DIRS"; public static final String LOG_DIR_EXPANSION_VAR = ""; + + public static final String STDERR = "stderr"; + + public static final String STDOUT = "stdout"; + + /** + * Classpath for typical applications. + */ + public static final String[] APPLICATION_CLASSPATH = + new String[] { + "$HADOOP_CONF_DIR", + "$HADOOP_COMMON_HOME/share/hadoop/common/*", + "$HADOOP_COMMON_HOME/share/hadoop/common/lib/*", + "$HADOOP_HDFS_HOME/share/hadoop/hdfs/*", + "$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*", + "$YARN_HOME/modules/*", + "$YARN_HOME/lib/*" + }; + + /** + * Environment for Applications. + * + * Some of the environment variables for applications are final + * i.e. they cannot be modified by the applications. + */ + public enum Environment { + /** + * $USER + * Final, non-modifiable. + */ + USER("USER"), + + /** + * $LOGNAME + * Final, non-modifiable. + */ + LOGNAME("LOGNAME"), + + /** + * $HOME + * Final, non-modifiable. + */ + HOME("HOME"), + + /** + * $PWD + * Final, non-modifiable. + */ + PWD("PWD"), + + /** + * $PATH + */ + PATH("PATH"), + + /** + * $SHELL + */ + SHELL("SHELL"), + + /** + * $JAVA_HOME + */ + JAVA_HOME("JAVA_HOME"), + + /** + * $CLASSPATH + */ + CLASSPATH("CLASSPATH"), + + /** + * $LD_LIBRARY_PATH + */ + LD_LIBRARY_PATH("LD_LIBRARY_PATH"), + + /** + * $HADOOP_CONF_DIR + * Final, non-modifiable. + */ + HADOOP_CONF_DIR("HADOOP_CONF_DIR"), + + /** + * $HADOOP_COMMON_HOME + */ + HADOOP_COMMON_HOME("HADOOP_COMMON_HOME"), + + /** + * $HADOOP_HDFS_HOME + */ + HADOOP_HDFS_HOME("HADOOP_HDFS_HOME"), + + /** + * $YARN_HOME + */ + YARN_HOME("YARN_HOME"); + + private final String variable; + private Environment(String variable) { + this.variable = variable; + } + + public String key() { + return variable; + } + + public String toString() { + return variable; + } + + public String $() { + return "$" + variable; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ba23134170f..f34830c6050 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -357,6 +357,12 @@ public class YarnConfiguration extends Configuration { public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; + public static final String NM_USER_HOME_DIR = + NM_PREFIX + "user-home-dir"; + + public static final String DEFAULT_NM_USER_HOME_DIR= "/home/"; + + public static final int INVALID_CONTAINER_EXIT_STATUS = -1000; public static final int ABORTED_CONTAINER_EXIT_STATUS = -100; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index a7e82a2d41a..83872876797 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -133,8 +133,10 @@ public int launchContainer(Container container, String[] command = new String[] { "bash", "-c", launchDst.toUri().getPath().toString() }; LOG.info("launchContainer: " + Arrays.toString(command)); - shExec = new ShellCommandExecutor(command, - new File(containerWorkDir.toUri().getPath())); + shExec = new ShellCommandExecutor( + command, + new File(containerWorkDir.toUri().getPath()), + container.getLaunchContext().getEnvironment()); // sanitized env launchCommandObjs.put(containerId, shExec); shExec.execute(); } catch (IOException e) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 97721f72a36..0779d3b1581 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -161,7 +161,11 @@ public int launchContainer(Container container, nmPrivateCotainerScriptPath.toUri().getPath().toString(), nmPrivateTokensPath.toUri().getPath().toString())); String[] commandArray = command.toArray(new String[command.size()]); - ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray); + ShellCommandExecutor shExec = + new ShellCommandExecutor( + commandArray, + null, // NM's cwd + container.getLaunchContext().getEnvironment()); // sanitized env launchCommandObjs.put(containerId, shExec); // DEBUG LOG.info("launchContainer: " + Arrays.toString(commandArray)); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 497460d3e7d..43afa4cb85e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -44,6 +44,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -89,7 +90,6 @@ public Integer call() { final Map localResources = container.getLocalizedResources(); String containerIdStr = ConverterUtils.toString(container.getContainerID()); final String user = launchContext.getUser(); - final Map env = launchContext.getEnvironment(); final List command = launchContext.getCommands(); int ret = -1; @@ -109,16 +109,16 @@ public Integer call() { } launchContext.setCommands(newCmds); - Map envs = launchContext.getEnvironment(); - Map newEnvs = new HashMap(envs.size()); - for (Entry entry : envs.entrySet()) { - newEnvs.put( - entry.getKey(), - entry.getValue().replace( + Map environment = launchContext.getEnvironment(); + // Make a copy of env to iterate & do variable expansion + for (Entry entry : environment.entrySet()) { + String value = entry.getValue(); + entry.setValue( + value.replace( ApplicationConstants.LOG_DIR_EXPANSION_VAR, - containerLogDir.toUri().getPath())); + containerLogDir.toUri().getPath()) + ); } - launchContext.setEnvironment(newEnvs); // /////////////////////////// End of variable expansion FileContext lfs = FileContext.getLocalFSFileContext(); @@ -164,11 +164,18 @@ public Integer call() { EnumSet.of(CREATE, OVERWRITE)); // Set the token location too. - env.put(ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME, new Path( - containerWorkDir, FINAL_CONTAINER_TOKENS_FILE).toUri().getPath()); + environment.put( + ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME, + new Path(containerWorkDir, + FINAL_CONTAINER_TOKENS_FILE).toUri().getPath()); - writeLaunchEnv(containerScriptOutStream, env, localResources, - launchContext.getCommands(), appDirs); + // Sanitize the container's environment + sanitizeEnv(environment, containerWorkDir, appDirs); + + // Write out the environment + writeLaunchEnv(containerScriptOutStream, environment, localResources, + launchContext.getCommands()); + // /////////// End of writing out container-script // /////////// Write out the container-tokens in the nmPrivate space. @@ -275,19 +282,71 @@ public String toString() { } + private static void putEnvIfNotNull( + Map environment, String variable, String value) { + if (value != null) { + environment.put(variable, value); + } + } + + private static void putEnvIfAbsent( + Map environment, String variable) { + if (environment.get(variable) == null) { + putEnvIfNotNull(environment, variable, System.getenv(variable)); + } + } + + public void sanitizeEnv(Map environment, + Path pwd, List appDirs) { + /** + * Non-modifiable environment variables + */ + + putEnvIfNotNull(environment, Environment.USER.name(), container.getUser()); + + putEnvIfNotNull(environment, + Environment.LOGNAME.name(),container.getUser()); + + putEnvIfNotNull(environment, + Environment.HOME.name(), + conf.get( + YarnConfiguration.NM_USER_HOME_DIR, + YarnConfiguration.DEFAULT_NM_USER_HOME_DIR + ) + ); + + putEnvIfNotNull(environment, Environment.PWD.name(), pwd.toString()); + + putEnvIfNotNull(environment, + Environment.HADOOP_CONF_DIR.name(), + System.getenv(Environment.HADOOP_CONF_DIR.name()) + ); + + putEnvIfNotNull(environment, + ApplicationConstants.LOCAL_DIR_ENV, + StringUtils.join(",", appDirs) + ); + + if (!Shell.WINDOWS) { + environment.put("JVM_PID", "$$"); + } + + /** + * Modifiable environment variables + */ + + putEnvIfAbsent(environment, Environment.JAVA_HOME.name()); + putEnvIfAbsent(environment, Environment.HADOOP_COMMON_HOME.name()); + putEnvIfAbsent(environment, Environment.HADOOP_HDFS_HOME.name()); + putEnvIfAbsent(environment, Environment.YARN_HOME.name()); + + } + private static void writeLaunchEnv(OutputStream out, Map environment, Map resources, - List command, List appDirs) + List command) throws IOException { ShellScriptBuilder sb = new ShellScriptBuilder(); - if (System.getenv("YARN_HOME") != null) { - // TODO: Get from whitelist. - sb.env("YARN_HOME", System.getenv("YARN_HOME")); - } - sb.env(ApplicationConstants.LOCAL_DIR_ENV, StringUtils.join(",", appDirs)); - if (!Shell.WINDOWS) { - sb.env("JVM_PID", "$$"); - } if (environment != null) { for (Map.Entry env : environment.entrySet()) { sb.env(env.getKey().toString(), env.getValue().toString());