MAPREDUCE-1700. User supplied dependencies may conflict with MapReduce system JARs.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2013-01-09 16:12:58 +00:00
parent eae2a30462
commit 0ba7078ef4
8 changed files with 130 additions and 10 deletions

View File

@ -247,6 +247,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4278. Cannot run two local jobs in parallel from the same MAPREDUCE-4278. Cannot run two local jobs in parallel from the same
gateway. (Sandy Ryza via tomwhite) gateway. (Sandy Ryza via tomwhite)
MAPREDUCE-1700. User supplied dependencies may conflict with MapReduce
system JARs. (tomwhite)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -254,6 +255,9 @@ private static JobConf configureTask(Task task, Credentials credentials,
final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE); final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
job.setCredentials(credentials); job.setCredentials(credentials);
// set job classloader if configured
MRApps.setJobClassLoader(job);
String appAttemptIdEnv = System String appAttemptIdEnv = System
.getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV); .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv); LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);

View File

@ -1223,6 +1223,8 @@ public static void main(String[] args) {
// SIGTERM I have a chance to write out the job history. I'll be closing // SIGTERM I have a chance to write out the job history. I'll be closing
// the objects myself. // the objects myself.
conf.setBoolean("fs.automatic.close", false); conf.setBoolean("fs.automatic.close", false);
// set job classloader if configured
MRApps.setJobClassLoader(conf);
initAndStartAppMaster(appMaster, conf, jobUserName); initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) { } catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t); LOG.fatal("Error starting MRAppMaster", t);

View File

@ -165,6 +165,7 @@ public abstract class TaskAttemptImpl implements
private Token<JobTokenIdentifier> jobToken; private Token<JobTokenIdentifier> jobToken;
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null; private static String initialClasspath = null;
private static String initialAppClasspath = null;
private static Object commonContainerSpecLock = new Object(); private static Object commonContainerSpecLock = new Object();
private static ContainerLaunchContext commonContainerSpec = null; private static ContainerLaunchContext commonContainerSpec = null;
private static final Object classpathLock = new Object(); private static final Object classpathLock = new Object();
@ -599,6 +600,7 @@ private static String getInitialClasspath(Configuration conf) throws IOException
Map<String, String> env = new HashMap<String, String>(); Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf); MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name()); initialClasspath = env.get(Environment.CLASSPATH.name());
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialClasspathFlag.set(true); initialClasspathFlag.set(true);
return initialClasspath; return initialClasspath;
} }
@ -697,6 +699,13 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
environment, environment,
Environment.CLASSPATH.name(), Environment.CLASSPATH.name(),
getInitialClasspath(conf)); getInitialClasspath(conf));
if (initialAppClasspath != null) {
Apps.addToEnvironment(
environment,
Environment.APP_CLASSPATH.name(),
initialAppClasspath);
}
} catch (IOException e) { } catch (IOException e) {
throw new YarnException(e); throw new YarnException(e);
} }

View File

@ -19,12 +19,18 @@
package org.apache.hadoop.mapreduce.v2.util; package org.apache.hadoop.mapreduce.v2.util;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -50,6 +56,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -62,6 +69,8 @@
@Private @Private
@Unstable @Unstable
public class MRApps extends Apps { public class MRApps extends Apps {
public static final Log LOG = LogFactory.getLog(MRApps.class);
public static String toString(JobId jid) { public static String toString(JobId jid) {
return jid.toString(); return jid.toString();
} }
@ -157,38 +166,42 @@ public static void setClasspath(Map<String, String> environment,
boolean userClassesTakesPrecedence = boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
String classpathEnvVar =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
Apps.addToEnvironment(environment, Apps.addToEnvironment(environment,
Environment.CLASSPATH.name(), classpathEnvVar,
Environment.PWD.$()); Environment.PWD.$());
if (!userClassesTakesPrecedence) { if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf); MRApps.setMRFrameworkClasspath(environment, conf);
} }
Apps.addToEnvironment( Apps.addToEnvironment(
environment, environment,
Environment.CLASSPATH.name(), classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR); MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR);
Apps.addToEnvironment( Apps.addToEnvironment(
environment, environment,
Environment.CLASSPATH.name(), classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR); MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR);
Apps.addToEnvironment( Apps.addToEnvironment(
environment, environment,
Environment.CLASSPATH.name(), classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*"); MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*");
Apps.addToEnvironment( Apps.addToEnvironment(
environment, environment,
Environment.CLASSPATH.name(), classpathEnvVar,
Environment.PWD.$() + Path.SEPARATOR + "*"); Environment.PWD.$() + Path.SEPARATOR + "*");
// a * in the classpath will only find a .jar, so we need to filter out // a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else // all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
DistributedCache.getCacheFiles(conf), DistributedCache.getCacheFiles(conf),
conf, conf,
environment); environment, classpathEnvVar);
addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
DistributedCache.getCacheArchives(conf), DistributedCache.getCacheArchives(conf),
conf, conf,
environment); environment, classpathEnvVar);
if (userClassesTakesPrecedence) { if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf); MRApps.setMRFrameworkClasspath(environment, conf);
} }
@ -204,7 +217,8 @@ public static void setClasspath(Map<String, String> environment,
*/ */
private static void addToClasspathIfNotJar(Path[] paths, private static void addToClasspathIfNotJar(Path[] paths,
URI[] withLinks, Configuration conf, URI[] withLinks, Configuration conf,
Map<String, String> environment) throws IOException { Map<String, String> environment,
String classpathEnvVar) throws IOException {
if (paths != null) { if (paths != null) {
HashMap<Path, String> linkLookup = new HashMap<Path, String>(); HashMap<Path, String> linkLookup = new HashMap<Path, String>();
if (withLinks != null) { if (withLinks != null) {
@ -232,13 +246,64 @@ private static void addToClasspathIfNotJar(Path[] paths,
if(!name.toLowerCase().endsWith(".jar")) { if(!name.toLowerCase().endsWith(".jar")) {
Apps.addToEnvironment( Apps.addToEnvironment(
environment, environment,
Environment.CLASSPATH.name(), classpathEnvVar,
Environment.PWD.$() + Path.SEPARATOR + name); Environment.PWD.$() + Path.SEPARATOR + name);
} }
} }
} }
} }
/**
* Sets a {@link ApplicationClassLoader} on the given configuration and as
* the context classloader, if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
* @throws IOException
*/
public static void setJobClassLoader(Configuration conf)
throws IOException {
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
} else {
LOG.info("Using job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
String[] systemClasses = conf.getStrings(
MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
if (jobClassLoader != null) {
conf.setClassLoader(jobClassLoader);
Thread.currentThread().setContextClassLoader(jobClassLoader);
}
}
}
}
private static ClassLoader createJobClassLoader(final String appClasspath,
final String[] systemClasses) throws IOException {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<ClassLoader>() {
@Override
public ClassLoader run() throws MalformedURLException {
return new ApplicationClassLoader(appClasspath,
MRApps.class.getClassLoader(), Arrays.asList(systemClasses));
}
});
} catch (PrivilegedActionException e) {
Throwable t = e.getCause();
if (t instanceof MalformedURLException) {
throw (MalformedURLException) t;
}
throw new IOException(e);
}
}
private static final String STAGING_CONSTANT = ".staging"; private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) { public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,

View File

@ -235,6 +235,22 @@ private static void delete(File dir) throws IOException {
index, 0); index, 0);
} }
@Test public void testSetClasspathWithJobClassloader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
String cp = env.get("CLASSPATH");
String appCp = env.get("APP_CLASSPATH");
assertSame("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is"
+ " in the classpath!", cp.indexOf("jar:job"), -1);
assertSame("MAPREDUCE_JOB_CLASSLOADER true, but PWD is"
+ " in the classpath!", cp.indexOf("PWD"), -1);
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not"
+ " in the app classpath!",
"$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*", appCp);
}
@Test @Test
public void testSetupDistributedCacheEmpty() throws IOException { public void testSetupDistributedCacheEmpty() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -127,6 +127,10 @@ public interface MRJobConfig {
public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first"; public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
public static final String MAPREDUCE_JOB_CLASSLOADER = "mapreduce.job.classloader";
public static final String MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES = "mapreduce.job.classloader.system.classes";
public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor"; public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb"; public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";

View File

@ -961,6 +961,23 @@
<value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value> <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property> </property>
<property>
<name>mapreduce.job.classloader</name>
<value>false</value>
<description>Whether to use a separate (isolated) classloader for
user classes in the task JVM.</description>
</property>
<property>
<name>mapreduce.job.classloader.system.classes</name>
<value>java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.</value>
<description>A comma-separated list of classes that should be loaded from the
system classpath, not the user-supplied JARs, when mapreduce.job.classloader
is enabled. Names ending in '.' (period) are treated as package names,
and names starting with a '-' are treated as negative matches.
</description>
</property>
<!-- jobhistory properties --> <!-- jobhistory properties -->
<property> <property>