MAPREDUCE-2880. Improved classpath-construction for mapreduce AM and containers. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173783 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-21 18:28:23 +00:00
parent 83e4b2b469
commit d00b3c49f6
25 changed files with 557 additions and 358 deletions

View File

@ -321,6 +321,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-901. Efficient framework counters. (llu via acmurthy)
MAPREDUCE-2880. Improve classpath-construction for mapreduce AM and
containers. (Arun C Murthy via vinodkv)
BUG FIXES
MAPREDUCE-2603. Disable High-Ram emulation in system tests.

View File

@ -18,27 +18,25 @@
package org.apache.hadoop.mapred;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.ID;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
public class MapReduceChildJVM {
private static final String SYSTEM_PATH_SEPARATOR =
System.getProperty("path.separator");
private static final Log LOG = LogFactory.getLog(MapReduceChildJVM.class);
private static File getTaskLogFile(String logDir, LogName filter) {
return new File(logDir, filter.toString());
private static String getTaskLogFile(LogName filter) {
return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
filter.toString();
}
private static String getChildEnv(JobConf jobConf, boolean isMap) {
@ -50,32 +48,53 @@ private static String getChildEnv(JobConf jobConf, boolean isMap) {
jobConf.get(jobConf.MAPRED_TASK_ENV));
}
public static void setVMEnv(Map<String, String> env,
List<String> classPaths, String pwd, String containerLogDir,
String nmLdLibraryPath, Task task, CharSequence applicationTokensFile) {
private static String getChildLogLevel(JobConf conf, boolean isMap) {
if (isMap) {
return conf.get(
MRJobConfig.MAP_LOG_LEVEL,
JobConf.DEFAULT_LOG_LEVEL.toString()
);
} else {
return conf.get(
MRJobConfig.REDUCE_LOG_LEVEL,
JobConf.DEFAULT_LOG_LEVEL.toString()
);
}
}
public static void setVMEnv(Map<String, String> environment,
Task task) {
JobConf conf = task.conf;
// Add classpath.
CharSequence cp = env.get("CLASSPATH");
String classpath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
if (null == cp) {
env.put("CLASSPATH", classpath);
} else {
env.put("CLASSPATH", classpath + SYSTEM_PATH_SEPARATOR + cp);
}
// Shell
environment.put(
Environment.SHELL.name(),
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_SHELL,
MRJobConfig.DEFAULT_SHELL)
);
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
MRApps.addToEnvironment(
environment,
Environment.LD_LIBRARY_PATH.name(),
Environment.PWD.$());
/////// Environmental variable LD_LIBRARY_PATH
StringBuilder ldLibraryPath = new StringBuilder();
// Add the env variables passed by the user & admin
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
MRApps.setEnvFromInputString(environment, mapredChildEnv);
MRApps.setEnvFromInputString(
environment,
conf.get(
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
);
ldLibraryPath.append(nmLdLibraryPath);
ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
ldLibraryPath.append(pwd);
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
/////// Environmental variable LD_LIBRARY_PATH
// for the child of task jvm, set hadoop.root.logger
env.put("HADOOP_ROOT_LOGGER", "DEBUG,CLA"); // TODO: Debug
// Set logging level
environment.put(
"HADOOP_ROOT_LOGGER",
getChildLogLevel(conf, task.isMapTask()) + ",CLA");
// TODO: The following is useful for instance in streaming tasks. Should be
// set in ApplicationMaster's env by the RM.
@ -89,76 +108,69 @@ public static void setVMEnv(Map<String, String> env,
// properties.
long logSize = TaskLog.getTaskLogLength(conf);
Vector<String> logProps = new Vector<String>(4);
setupLog4jProperties(logProps, logSize, containerLogDir);
setupLog4jProperties(logProps, logSize);
Iterator<String> it = logProps.iterator();
StringBuffer buffer = new StringBuffer();
while (it.hasNext()) {
buffer.append(" " + it.next());
}
hadoopClientOpts = hadoopClientOpts + buffer.toString();
env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
// add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
String childEnvs[] = mapredChildEnv.split(",");
for (String cEnv : childEnvs) {
String[] parts = cEnv.split("="); // split on '='
String value = (String) env.get(parts[0]);
if (value != null) {
// replace $env with the child's env constructed by tt's
// example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
value = parts[1].replace("$" + parts[0], value);
} else {
// this key is not configured by the tt for the child .. get it
// from the tt's env
// example PATH=$PATH:/tmp
value = System.getenv(parts[0]); // Get from NM?
if (value != null) {
// the env key is present in the tt's env
value = parts[1].replace("$" + parts[0], value);
} else {
// the env key is note present anywhere .. simply set it
// example X=$X:/tmp or X=/tmp
value = parts[1].replace("$" + parts[0], "");
}
}
env.put(parts[0], value);
}
}
//This should not be set here (If an OS check is requied. moved to ContainerLuanch)
// env.put("JVM_PID", "`echo $$`");
env.put(Constants.STDOUT_LOGFILE_ENV,
getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString());
env.put(Constants.STDERR_LOGFILE_ENV,
getTaskLogFile(containerLogDir, TaskLog.LogName.STDERR).toString());
// Add stdout/stderr env
environment.put(
MRJobConfig.STDOUT_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDOUT)
);
environment.put(
MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR)
);
}
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
String userClasspath = "";
String adminClasspath = "";
if (isMapTask) {
return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
userClasspath =
jobConf.get(
JobConf.MAPRED_MAP_TASK_JAVA_OPTS,
jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
);
adminClasspath =
jobConf.get(
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
} else {
userClasspath =
jobConf.get(
JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS,
jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
);
adminClasspath =
jobConf.get(
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
}
return jobConf
.get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
// Add admin classpath first so it can be overridden by user.
return adminClasspath + " " + userClasspath;
}
private static void setupLog4jProperties(Vector<String> vargs,
long logSize, String containerLogDir) {
long logSize) {
vargs.add("-Dlog4j.configuration=container-log4j.properties");
vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + containerLogDir);
vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
vargs.add("-Dhadoop.yarn.mr.totalLogFileSize=" + logSize);
}
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, Task task, String javaHome,
String workDir, String logDir, String childTmpDir, ID jvmID) {
InetSocketAddress taskAttemptListenerAddr, Task task,
ID jvmID) {
TaskAttemptID attemptID = task.getTaskID();
JobConf conf = task.conf;
@ -166,7 +178,7 @@ public static List<String> getVMCommand(
Vector<String> vargs = new Vector<String>(8);
vargs.add("exec");
vargs.add(javaHome + "/bin/java");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
// Add child (task) java-vm options.
//
@ -199,44 +211,26 @@ public static List<String> getVMCommand(
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
String [] javaOptsSplit = javaOpts.split(" ");
// Add java.library.path; necessary for loading native libraries.
//
// 1. We add the 'cwd' of the task to it's java.library.path to help
// users distribute native libraries via the DistributedCache.
// 2. The user can also specify extra paths to be added to the
// java.library.path via mapred.{map|reduce}.child.java.opts.
//
String libraryPath = workDir;
boolean hasUserLDPath = false;
for(int i=0; i<javaOptsSplit.length ;i++) {
if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
// TODO: Does the above take care of escaped space chars
javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
hasUserLDPath = true;
break;
}
}
if(!hasUserLDPath) {
vargs.add("-Djava.library.path=" + libraryPath);
}
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
if (childTmpDir != null) {
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
}
String childTmpDir = Environment.PWD.$() + Path.SEPARATOR + "tmp";
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
setupLog4jProperties(vargs, logSize, logDir);
setupLog4jProperties(vargs, logSize);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
File prof = getTaskLogFile(logDir, TaskLog.LogName.PROFILE);
vargs.add(String.format(conf.getProfileParams(), prof.toString()));
vargs.add(
String.format(
conf.getProfileParams(),
getTaskLogFile(TaskLog.LogName.PROFILE)
)
);
}
}
@ -249,8 +243,8 @@ public static List<String> getVMCommand(
// Finally add the jvmID
vargs.add(String.valueOf(jvmID.getId()));
vargs.add("1>" + getTaskLogFile(logDir, TaskLog.LogName.STDERR));
vargs.add("2>" + getTaskLogFile(logDir, TaskLog.LogName.STDOUT));
vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDERR));
vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDOUT));
// Final commmand
StringBuilder mergedCommand = new StringBuilder();

View File

@ -47,7 +47,6 @@
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.Credentials;
@ -71,7 +70,7 @@ public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
final JobConf defaultConf = new JobConf();
defaultConf.addResource(MRConstants.JOB_CONF_FILE);
defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
UserGroupInformation.setConfiguration(defaultConf);
String host = args[0];
@ -238,7 +237,7 @@ private static void configureLocalDirs(Task task, JobConf job) {
private static JobConf configureTask(Task task, Credentials credentials,
Token<JobTokenIdentifier> jt) throws IOException {
final JobConf job = new JobConf(MRConstants.JOB_CONF_FILE);
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials);
// set tcp nodelay
job.setBoolean("ipc.client.tcpnodelay", true);
@ -260,7 +259,7 @@ private static JobConf configureTask(Task task, Credentials credentials,
// Overwrite the localized task jobconf which is linked to in the current
// work-dir.
Path localTaskFile = new Path(Constants.JOBFILE);
Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
writeLocalJobFile(localTaskFile, job);
task.setJobFile(localTaskFile.toString());
task.setConf(job);

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@ -243,10 +242,10 @@ protected Job createJob(Configuration conf) {
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
new Path(new File(MRConstants.JOB_SUBMIT_DIR)
new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE);
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
@ -658,7 +657,7 @@ public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRConstants.JOB_CONF_FILE));
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
conf.set(MRJobConfig.USER_NAME,
System.getProperty("user.name"));
UserGroupInformation.setConfiguration(conf);

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -64,7 +65,6 @@
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
@ -1007,7 +1007,7 @@ protected void setup(JobImpl job) throws IOException {
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
@ -1033,7 +1033,7 @@ protected void setup(JobImpl job) throws IOException {
Path remoteJobTokenFile =
new Path(job.remoteJobSubmitDir,
MRConstants.APPLICATION_TOKENS_FILE);
MRJobConfig.APPLICATION_TOKENS_FILE);
tokenStorage.writeTokenStorageFile(remoteJobTokenFile, job.conf);
LOG.info("Writing back the job-token file on the remote file system:"
+ remoteJobTokenFile.toString());

View File

@ -21,7 +21,6 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@ -62,7 +61,6 @@
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
@ -103,6 +101,7 @@
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
@ -117,7 +116,6 @@
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@ -518,8 +516,8 @@ private String getInitialClasspath() throws IOException {
return initialClasspath;
}
Map<String, String> env = new HashMap<String, String>();
MRApps.setInitialClasspath(env);
initialClasspath = env.get(MRApps.CLASSPATH);
MRApps.setClasspath(env);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
@ -531,16 +529,18 @@ private String getInitialClasspath() throws IOException {
*/
private ContainerLaunchContext createContainerLaunchContext() {
ContainerLaunchContext container =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// Application resources
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
// Application environment
Map<String, String> environment = new HashMap<String, String>();
// Service data
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
// Tokens
ByteBuffer tokens = ByteBuffer.wrap(new byte[]{});
try {
FileSystem remoteFS = FileSystem.get(conf);
@ -550,7 +550,7 @@ private ContainerLaunchContext createContainerLaunchContext() {
MRJobConfig.JAR))).makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
localResources.put(
MRConstants.JOB_JAR,
MRJobConfig.JOB_JAR,
createLocalResource(remoteFS, recordFactory, remoteJobJar,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-jar file on the remote FS is "
@ -570,9 +570,9 @@ private ContainerLaunchContext createContainerLaunchContext() {
Path remoteJobSubmitDir =
new Path(path, oldJobId.toString());
Path remoteJobConfPath =
new Path(remoteJobSubmitDir, MRConstants.JOB_CONF_FILE);
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
localResources.put(
MRConstants.JOB_CONF_FILE,
MRJobConfig.JOB_CONF_FILE,
createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-conf file on the remote FS is "
@ -580,12 +580,8 @@ private ContainerLaunchContext createContainerLaunchContext() {
// //////////// End of JobConf setup
// Setup DistributedCache
MRApps.setupDistributedCache(conf, localResources, environment);
MRApps.setupDistributedCache(conf, localResources);
// Set local-resources and environment
container.setLocalResources(localResources);
container.setEnvironment(environment);
// Setup up tokens
Credentials taskCredentials = new Credentials();
@ -606,52 +602,43 @@ private ContainerLaunchContext createContainerLaunchContext() {
LOG.info("Size of containertokens_dob is "
+ taskCredentials.numberOfTokens());
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
container.setContainerTokens(
tokens =
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
containerTokens_dob.getLength()));
containerTokens_dob.getLength());
// Add shuffle token
LOG.info("Putting shuffle token in serviceData");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeServiceData(jobToken));
container.setServiceData(serviceData);
MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
MRApps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
getInitialClasspath());
} catch (IOException e) {
throw new YarnException(e);
}
// Setup environment
MapReduceChildJVM.setVMEnv(environment, remoteTask);
// Set up the launch command
List<String> commands = MapReduceChildJVM.getVMCommand(
taskAttemptListener.getAddress(), remoteTask,
jvmID);
container.setContainerId(containerID);
container.setUser(conf.get(MRJobConfig.USER_NAME)); // TODO: Fix
File workDir = new File("$PWD"); // Will be expanded by the shell.
String containerLogDir =
new File(ApplicationConstants.LOG_DIR_EXPANSION_VAR).toString();
String childTmpDir = new File(workDir, "tmp").toString();
String javaHome = "${JAVA_HOME}"; // Will be expanded by the shell.
String nmLdLibraryPath = "{LD_LIBRARY_PATH}"; // Expanded by the shell?
List<String> classPaths = new ArrayList<String>();
String localizedApplicationTokensFile =
new File(workDir, MRConstants.APPLICATION_TOKENS_FILE).toString();
classPaths.add(MRConstants.JOB_JAR);
classPaths.add(MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
classPaths.add(workDir.toString()); // TODO
// Construct the actual Container
container.setCommands(MapReduceChildJVM.getVMCommand(
taskAttemptListener.getAddress(), remoteTask, javaHome,
workDir.toString(), containerLogDir, childTmpDir, jvmID));
MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
localizedApplicationTokensFile);
// Construct the actual Container
ContainerLaunchContext container =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
container.setContainerId(containerID);
container.setUser(conf.get(MRJobConfig.USER_NAME));
container.setResource(assignedCapability);
container.setLocalResources(localResources);
container.setEnvironment(environment);
container.setCommands(commands);
container.setServiceData(serviceData);
container.setContainerTokens(tokens);
return container;
}

View File

@ -35,7 +35,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@ -87,7 +86,7 @@ public class DefaultSpeculator extends AbstractService implements
private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
= new ConcurrentHashMap<JobId, AtomicInteger>();
private final Set<TaskId> mayHaveSpeculated = new HashSet();
private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>();
private final Configuration conf;
private AppContext context;

View File

@ -1,50 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface MRConstants {
// This should be the directory where splits file gets localized on the node
// running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
// This should be the name of the localized job-configuration file on the node
// running ApplicationMaster and Task
public static final String JOB_CONF_FILE = "job.xml";
// This should be the name of the localized job-jar file on the node running
// individual containers/tasks.
public static final String JOB_JAR = "job.jar";
public static final String HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME =
"hadoop-mapreduce-client-app-0.24.0-SNAPSHOT.jar";
public static final String YARN_MAPREDUCE_APP_JAR_PATH =
"$YARN_HOME/modules/" + HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
// The token file for the application. Should contain tokens for access to
// remote file system and may optionally contain application specific tokens.
// For now, generated by the AppManagers and used by NodeManagers and the
// Containers.
public static final String APPLICATION_TOKENS_FILE = "appTokens";
}

View File

@ -39,14 +39,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -167,7 +167,7 @@ public static TaskAttemptStateUI taskAttemptState(String attemptStateStr) {
return TaskAttemptStateUI.valueOf(attemptStateStr);
}
public static void setInitialClasspath(
private static void setMRFrameworkClasspath(
Map<String, String> environment) throws IOException {
InputStream classpathFileStream = null;
BufferedReader reader = null;
@ -182,30 +182,17 @@ public static void setInitialClasspath(
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
String cp = reader.readLine();
if (cp != null) {
addToClassPath(environment, cp.trim());
addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
}
// Put the file itself on classpath for tasks.
addToClassPath(environment,
addToEnvironment(
environment,
Environment.CLASSPATH.name(),
thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile());
// If runtime env is different.
if (System.getenv().get("YARN_HOME") != null) {
ShellCommandExecutor exec =
new ShellCommandExecutor(new String[] {
System.getenv().get("YARN_HOME") + "/bin/yarn",
"classpath" });
exec.execute();
addToClassPath(environment, exec.getOutput().trim());
}
// Get yarn mapreduce-app classpath
if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) {
ShellCommandExecutor exec =
new ShellCommandExecutor(new String[] {
System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred",
"classpath" });
exec.execute();
addToClassPath(environment, exec.getOutput().trim());
// Add standard Hadoop classes
for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
addToEnvironment(environment, Environment.CLASSPATH.name(), c);
}
} finally {
if (classpathFileStream != null) {
@ -217,20 +204,35 @@ public static void setInitialClasspath(
}
// TODO: Remove duplicates.
}
private static final String SYSTEM_PATH_SEPARATOR =
System.getProperty("path.separator");
public static void addToClassPath(
Map<String, String> environment, String fileName) {
String classpath = environment.get(CLASSPATH);
if (classpath == null) {
classpath = fileName;
public static void addToEnvironment(
Map<String, String> environment,
String variable, String value) {
String val = environment.get(variable);
if (val == null) {
val = value;
} else {
classpath = classpath + ":" + fileName;
val = val + SYSTEM_PATH_SEPARATOR + value;
}
environment.put(CLASSPATH, classpath);
environment.put(variable, val);
}
public static final String CLASSPATH = "CLASSPATH";
public static void setClasspath(Map<String, String> environment)
throws IOException {
MRApps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
MRJobConfig.JOB_JAR);
MRApps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
MRApps.setMRFrameworkClasspath(environment);
}
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(
@ -241,7 +243,7 @@ public static Path getStagingAreaDir(Configuration conf, String user) {
public static String getJobFile(Configuration conf, String user,
org.apache.hadoop.mapreduce.JobID jobId) {
Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user),
jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
return jobFile.toString();
}
@ -260,12 +262,11 @@ private static long[] parseTimeStamps(String[] strs) {
public static void setupDistributedCache(
Configuration conf,
Map<String, LocalResource> localResources,
Map<String, String> env)
Map<String, LocalResource> localResources)
throws IOException {
// Cache archives
parseDistributedCacheArtifacts(conf, localResources, env,
parseDistributedCacheArtifacts(conf, localResources,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
@ -275,7 +276,7 @@ public static void setupDistributedCache(
// Cache files
parseDistributedCacheArtifacts(conf,
localResources, env,
localResources,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
@ -290,7 +291,6 @@ public static void setupDistributedCache(
private static void parseDistributedCacheArtifacts(
Configuration conf,
Map<String, LocalResource> localResources,
Map<String, String> env,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
@ -339,9 +339,6 @@ private static void parseDistributedCacheArtifacts(
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
if (classPaths.containsKey(u.getPath())) {
MRApps.addToClassPath(env, linkName);
}
}
}
}
@ -358,6 +355,42 @@ private static long[] getFileSizes(Configuration conf, String key) {
}
return result;
}
public static void setEnvFromInputString(Map<String, String> env,
String envString) {
if (envString != null && envString.length() > 0) {
String childEnvs[] = envString.split(",");
for (String cEnv : childEnvs) {
String[] parts = cEnv.split("="); // split on '='
String value = env.get(parts[0]);
if (value != null) {
// Replace $env with the child's env constructed by NM's
// For example: LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
value = parts[1].replace("$" + parts[0], value);
} else {
// example PATH=$PATH:/tmp
value = System.getenv(parts[0]);
if (value != null) {
// the env key is present in the tt's env
value = parts[1].replace("$" + parts[0], value);
} else {
// check for simple variable substitution
// for e.g. ROOT=$HOME
String envValue = System.getenv(parts[1].substring(1));
if (envValue != null) {
value = envValue;
} else {
// the env key is note present anywhere .. simply set it
// example X=$X:/tmp or X=/tmp
value = parts[1].replace("$" + parts[0], "");
}
}
}
addToEnvironment(env, parts[0], value);
}
}
}

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -115,7 +114,8 @@ public class TestMRApps {
@Test public void testGetJobFileWithUser() {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345));
String jobFile = MRApps.getJobFile(conf, "dummy-user",
new JobID("dummy-job", 12345));
assertNotNull("getJobFile results in null.", jobFile);
assertEquals("jobFile with specified user is not as expected.",
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
@ -560,7 +561,7 @@ void createInDiskSegment() throws IOException {
private Writer<K,V> createSpillFile() throws IOException {
Path tmp =
new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_"
new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_"
+ (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);

View File

@ -1,27 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
public class Constants {
static final String OUTPUT = "output";
public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
public static final String JOBFILE = "job.xml";
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
}

View File

@ -348,6 +348,7 @@ public class JobConf extends Configuration {
*/
public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
/**
* Construct a map/reduce job configuration.
*/

View File

@ -17,11 +17,16 @@
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/*******************************
* Some handy constants
*
*******************************/
interface MRConstants {
@Private
@Unstable
public class MRConstants {
//
// Timeouts, constants
//
@ -52,6 +57,4 @@ interface MRConstants {
* The reduce task number for which this map output is being transferred
*/
public static final String FOR_REDUCE_TASK = "for-reduce-task";
public static final String WORKDIR = "work";
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* Manipulate the working area for the transient store for maps and reduces.
@ -54,7 +55,7 @@ public MROutputFiles() {
@Override
public Path getOutputFile()
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR
return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, getConf());
}
@ -68,7 +69,7 @@ public Path getOutputFile()
@Override
public Path getOutputFileForWrite(long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR
return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, size, getConf());
}
@ -89,7 +90,7 @@ public Path getOutputFileForWriteInVolume(Path existing) {
@Override
public Path getOutputIndexFile()
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR
return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
getConf());
}
@ -104,7 +105,7 @@ public Path getOutputIndexFile()
@Override
public Path getOutputIndexFileForWrite(long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR
return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
size, getConf());
}
@ -128,7 +129,7 @@ public Path getOutputIndexFileForWriteInVolume(Path existing) {
@Override
public Path getSpillFile(int spillNumber)
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill"
return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out", getConf());
}
@ -143,7 +144,7 @@ public Path getSpillFile(int spillNumber)
@Override
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill"
return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out", size, getConf());
}
@ -157,7 +158,7 @@ public Path getSpillFileForWrite(int spillNumber, long size)
@Override
public Path getSpillIndexFile(int spillNumber)
throws IOException {
return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill"
return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out.index", getConf());
}
@ -172,7 +173,7 @@ public Path getSpillIndexFile(int spillNumber)
@Override
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill"
return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out.index", size, getConf());
}
@ -187,7 +188,7 @@ public Path getSpillIndexFileForWrite(int spillNumber, long size)
public Path getInputFile(int mapId)
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, Integer
REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, Integer
.valueOf(mapId)), getConf());
}
@ -204,7 +205,7 @@ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, mapId.getId()),
REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()),
size, getConf());
}
@ -212,7 +213,7 @@ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
@Override
public void removeAll()
throws IOException {
((JobConf)getConf()).deleteLocalFiles(Constants.OUTPUT);
((JobConf)getConf()).deleteLocalFiles(MRJobConfig.OUTPUT);
}
@Override

View File

@ -210,6 +210,8 @@ public interface MRJobConfig {
public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level";
public static final String DEFAULT_LOG_LEVEL = "INFO";
public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
@ -400,4 +402,64 @@ public interface MRJobConfig {
*/
public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR =
MR_AM_PREFIX + "create-intermediate-jh-base-dir";
public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
"mapreduce.admin.map.child.java.opts";
public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
"mapreduce.admin.reduce.child.java.opts";
public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
"-Djava.net.preferIPv4Stack=true " +
"-Dhadoop.metrics.log.level=WARN ";
public static final String MAPRED_ADMIN_USER_SHELL =
"mapreduce.admin.user.shell";
public static final String DEFAULT_SHELL = "/bin/bash";
public static final String MAPRED_ADMIN_USER_ENV =
"mapreduce.admin.user.env";
public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
"LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib";
public static final String WORKDIR = "work";
public static final String OUTPUT = "output";
public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
// This should be the directory where splits file gets localized on the node
// running ApplicationMaster.
public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
// This should be the name of the localized job-configuration file on the node
// running ApplicationMaster and Task
public static final String JOB_CONF_FILE = "job.xml";
// This should be the name of the localized job-jar file on the node running
// individual containers/tasks.
public static final String JOB_JAR = "job.jar";
public static final String JOB_SPLIT = "job.split";
public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo";
public static final String APPLICATION_MASTER_CLASS =
"org.apache.hadoop.mapreduce.v2.app.MRAppMaster";
// The token file for the application. Should contain tokens for access to
// remote file system and may optionally contain application specific tokens.
// For now, generated by the AppManagers and used by NodeManagers and the
// Containers.
public static final String APPLICATION_TOKENS_FILE = "appTokens";
public static final String MAPREDUCE_V2_CHILD_CLASS =
"org.apache.hadoop.mapred.YarnChild";
}

View File

@ -32,12 +32,12 @@
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
@ -254,7 +254,7 @@ public String getStagingAreaDir() throws IOException, InterruptedException {
public String getSystemDir() throws IOException, InterruptedException {
Path sysDir = new Path(MRConstants.JOB_SUBMIT_DIR);
Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
//FileContext.getFileContext(conf).delete(sysDir, true);
return sysDir.toString();
}

View File

@ -51,7 +51,6 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
@ -60,6 +59,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
@ -210,7 +210,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
// Upload only in security mode: TODO
Path applicationTokensFile =
new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE);
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
try {
ts.writeTokenStorageFile(applicationTokensFile, conf);
} catch (IOException e) {
@ -226,7 +226,9 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics());
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|| appMaster.getState() == ApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
@ -263,7 +265,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = ConverterUtils
.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
@ -272,13 +274,13 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
localResources.put(MRConstants.JOB_CONF_FILE,
localResources.put(MRJobConfig.JOB_CONF_FILE,
createApplicationResource(defaultFileContext,
jobConfPath));
if (jobConf.get(MRJobConfig.JAR) != null) {
localResources.put(MRConstants.JOB_JAR,
localResources.put(MRJobConfig.JOB_JAR,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, MRConstants.JOB_JAR)));
new Path(jobSubmitDir, MRJobConfig.JOB_JAR)));
} else {
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
// mapreduce jar itself which is already on the classpath.
@ -287,10 +289,12 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
}
// TODO gross hack
for (String s : new String[] { "job.split", "job.splitmetainfo",
MRConstants.APPLICATION_TOKENS_FILE }) {
for (String s : new String[] {
MRJobConfig.JOB_SPLIT,
MRJobConfig.JOB_SPLIT_METAINFO,
MRJobConfig.APPLICATION_TOKENS_FILE }) {
localResources.put(
MRConstants.JOB_SUBMIT_DIR + "/" + s,
MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, s)));
}
@ -304,9 +308,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
}
// Setup the command to run the AM
String javaHome = "$JAVA_HOME";
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
vargs.add(javaHome + "/bin/java");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
vargs.add("-Dhadoop.root.logger="
+ conf.get(MRJobConfig.MR_AM_LOG_OPTS,
MRJobConfig.DEFAULT_MR_AM_LOG_OPTS) + ",console");
@ -314,12 +317,15 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
vargs.add(String.valueOf(applicationId.getId()));
vargs.add(ApplicationConstants.AM_FAIL_COUNT_STRING);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDERR);
Vector<String> vargsFinal = new Vector<String>(8);
// Final commmand
@ -332,15 +338,13 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
LOG.info("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
// Setup the environment - Add { job jar, MR app jar } to classpath.
// Setup the CLASSPATH in environment
// i.e. add { job jar, CWD, Hadoop jars} to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setInitialClasspath(environment);
MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
MRApps.addToClassPath(environment,
MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
MRApps.setClasspath(environment);
// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources, environment);
MRApps.setupDistributedCache(jobConf, localResources);
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =

View File

@ -43,9 +43,15 @@
*/
public class MiniMRYarnCluster extends MiniYARNCluster {
public static final String HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME =
"hadoop-mapreduce-client-app-0.24.0-SNAPSHOT.jar";
public static final String YARN_MAPREDUCE_APP_JAR_PATH =
"$YARN_HOME/modules/" + HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
public static final String APPJAR =
"../hadoop-mapreduce-client-app/target/"
+ MRConstants.HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
+ HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
private JobHistoryServer historyServer;

View File

@ -402,7 +402,7 @@ public void setup(Context context) throws IOException {
// both should be reachable via the class loader.
Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
Assert.assertNull(cl.getResource("distributed.jar.inside4"));
Assert.assertNotNull(cl.getResource("distributed.jar.inside4"));
// Check that the symlink for the renaming was created in the cwd;
File symlinkFile = new File("distributed.first.symlink");

View File

@ -46,4 +46,117 @@ public interface ApplicationConstants {
public static final String LOCAL_DIR_ENV = "YARN_LOCAL_DIRS";
public static final String LOG_DIR_EXPANSION_VAR = "<LOG_DIR>";
public static final String STDERR = "stderr";
public static final String STDOUT = "stdout";
/**
* Classpath for typical applications.
*/
public static final String[] APPLICATION_CLASSPATH =
new String[] {
"$HADOOP_CONF_DIR",
"$HADOOP_COMMON_HOME/share/hadoop/common/*",
"$HADOOP_COMMON_HOME/share/hadoop/common/lib/*",
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*",
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*",
"$YARN_HOME/modules/*",
"$YARN_HOME/lib/*"
};
/**
* Environment for Applications.
*
* Some of the environment variables for applications are <em>final</em>
* i.e. they cannot be modified by the applications.
*/
public enum Environment {
/**
* $USER
* Final, non-modifiable.
*/
USER("USER"),
/**
* $LOGNAME
* Final, non-modifiable.
*/
LOGNAME("LOGNAME"),
/**
* $HOME
* Final, non-modifiable.
*/
HOME("HOME"),
/**
* $PWD
* Final, non-modifiable.
*/
PWD("PWD"),
/**
* $PATH
*/
PATH("PATH"),
/**
* $SHELL
*/
SHELL("SHELL"),
/**
* $JAVA_HOME
*/
JAVA_HOME("JAVA_HOME"),
/**
* $CLASSPATH
*/
CLASSPATH("CLASSPATH"),
/**
* $LD_LIBRARY_PATH
*/
LD_LIBRARY_PATH("LD_LIBRARY_PATH"),
/**
* $HADOOP_CONF_DIR
* Final, non-modifiable.
*/
HADOOP_CONF_DIR("HADOOP_CONF_DIR"),
/**
* $HADOOP_COMMON_HOME
*/
HADOOP_COMMON_HOME("HADOOP_COMMON_HOME"),
/**
* $HADOOP_HDFS_HOME
*/
HADOOP_HDFS_HOME("HADOOP_HDFS_HOME"),
/**
* $YARN_HOME
*/
YARN_HOME("YARN_HOME");
private final String variable;
private Environment(String variable) {
this.variable = variable;
}
public String key() {
return variable;
}
public String toString() {
return variable;
}
public String $() {
return "$" + variable;
}
}
}

View File

@ -357,6 +357,12 @@ public class YarnConfiguration extends Configuration {
public static final String NM_AUX_SERVICE_FMT =
NM_PREFIX + "aux-services.%s.class";
public static final String NM_USER_HOME_DIR =
NM_PREFIX + "user-home-dir";
public static final String DEFAULT_NM_USER_HOME_DIR= "/home/";
public static final int INVALID_CONTAINER_EXIT_STATUS = -1000;
public static final int ABORTED_CONTAINER_EXIT_STATUS = -100;

View File

@ -133,8 +133,10 @@ public int launchContainer(Container container,
String[] command =
new String[] { "bash", "-c", launchDst.toUri().getPath().toString() };
LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor(command,
new File(containerWorkDir.toUri().getPath()));
shExec = new ShellCommandExecutor(
command,
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment()); // sanitized env
launchCommandObjs.put(containerId, shExec);
shExec.execute();
} catch (IOException e) {

View File

@ -161,7 +161,11 @@ public int launchContainer(Container container,
nmPrivateCotainerScriptPath.toUri().getPath().toString(),
nmPrivateTokensPath.toUri().getPath().toString()));
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
ShellCommandExecutor shExec =
new ShellCommandExecutor(
commandArray,
null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env
launchCommandObjs.put(containerId, shExec);
// DEBUG
LOG.info("launchContainer: " + Arrays.toString(commandArray));

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -89,7 +90,6 @@ public Integer call() {
final Map<Path,String> localResources = container.getLocalizedResources();
String containerIdStr = ConverterUtils.toString(container.getContainerID());
final String user = launchContext.getUser();
final Map<String,String> env = launchContext.getEnvironment();
final List<String> command = launchContext.getCommands();
int ret = -1;
@ -109,16 +109,16 @@ public Integer call() {
}
launchContext.setCommands(newCmds);
Map<String, String> envs = launchContext.getEnvironment();
Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
for (Entry<String, String> entry : envs.entrySet()) {
newEnvs.put(
entry.getKey(),
entry.getValue().replace(
Map<String, String> environment = launchContext.getEnvironment();
// Make a copy of env to iterate & do variable expansion
for (Entry<String, String> entry : environment.entrySet()) {
String value = entry.getValue();
entry.setValue(
value.replace(
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath()));
containerLogDir.toUri().getPath())
);
}
launchContext.setEnvironment(newEnvs);
// /////////////////////////// End of variable expansion
FileContext lfs = FileContext.getLocalFSFileContext();
@ -164,11 +164,18 @@ public Integer call() {
EnumSet.of(CREATE, OVERWRITE));
// Set the token location too.
env.put(ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME, new Path(
containerWorkDir, FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
environment.put(
ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
new Path(containerWorkDir,
FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());
writeLaunchEnv(containerScriptOutStream, env, localResources,
launchContext.getCommands(), appDirs);
// Sanitize the container's environment
sanitizeEnv(environment, containerWorkDir, appDirs);
// Write out the environment
writeLaunchEnv(containerScriptOutStream, environment, localResources,
launchContext.getCommands());
// /////////// End of writing out container-script
// /////////// Write out the container-tokens in the nmPrivate space.
@ -275,19 +282,71 @@ public String toString() {
}
private static void putEnvIfNotNull(
Map<String, String> environment, String variable, String value) {
if (value != null) {
environment.put(variable, value);
}
}
private static void putEnvIfAbsent(
Map<String, String> environment, String variable) {
if (environment.get(variable) == null) {
putEnvIfNotNull(environment, variable, System.getenv(variable));
}
}
public void sanitizeEnv(Map<String, String> environment,
Path pwd, List<Path> appDirs) {
/**
* Non-modifiable environment variables
*/
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
putEnvIfNotNull(environment,
Environment.LOGNAME.name(),container.getUser());
putEnvIfNotNull(environment,
Environment.HOME.name(),
conf.get(
YarnConfiguration.NM_USER_HOME_DIR,
YarnConfiguration.DEFAULT_NM_USER_HOME_DIR
)
);
putEnvIfNotNull(environment, Environment.PWD.name(), pwd.toString());
putEnvIfNotNull(environment,
Environment.HADOOP_CONF_DIR.name(),
System.getenv(Environment.HADOOP_CONF_DIR.name())
);
putEnvIfNotNull(environment,
ApplicationConstants.LOCAL_DIR_ENV,
StringUtils.join(",", appDirs)
);
if (!Shell.WINDOWS) {
environment.put("JVM_PID", "$$");
}
/**
* Modifiable environment variables
*/
putEnvIfAbsent(environment, Environment.JAVA_HOME.name());
putEnvIfAbsent(environment, Environment.HADOOP_COMMON_HOME.name());
putEnvIfAbsent(environment, Environment.HADOOP_HDFS_HOME.name());
putEnvIfAbsent(environment, Environment.YARN_HOME.name());
}
private static void writeLaunchEnv(OutputStream out,
Map<String,String> environment, Map<Path,String> resources,
List<String> command, List<Path> appDirs)
List<String> command)
throws IOException {
ShellScriptBuilder sb = new ShellScriptBuilder();
if (System.getenv("YARN_HOME") != null) {
// TODO: Get from whitelist.
sb.env("YARN_HOME", System.getenv("YARN_HOME"));
}
sb.env(ApplicationConstants.LOCAL_DIR_ENV, StringUtils.join(",", appDirs));
if (!Shell.WINDOWS) {
sb.env("JVM_PID", "$$");
}
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
sb.env(env.getKey().toString(), env.getValue().toString());