MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed cache files so that child processes running hadoop scripts can access these files. Contributed by Junping Du.
(cherry picked from commit 9da81a395584cb1dbfdee012222fd4de53522681) (cherry picked from commit d4b86e6f403edc6cdd687f978e3abff8752faa34)
This commit is contained in:
parent
b99a85139e
commit
ab4c2ce248
@ -12,6 +12,10 @@ Release 2.6.2 - UNRELEASED
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed
|
||||
cache files so that child processes running hadoop scripts can access these
|
||||
files. (Junping Du via vinodkv)
|
||||
|
||||
Release 2.6.1 - 2015-09-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -172,6 +172,7 @@ public abstract class TaskAttemptImpl implements
|
||||
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
|
||||
private static String initialClasspath = null;
|
||||
private static String initialAppClasspath = null;
|
||||
private static String initialHadoopClasspath = null;
|
||||
private static Object commonContainerSpecLock = new Object();
|
||||
private static ContainerLaunchContext commonContainerSpec = null;
|
||||
private static final Object classpathLock = new Object();
|
||||
@ -620,6 +621,7 @@ private static String getInitialClasspath(Configuration conf) throws IOException
|
||||
MRApps.setClasspath(env, conf);
|
||||
initialClasspath = env.get(Environment.CLASSPATH.name());
|
||||
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
|
||||
initialHadoopClasspath = env.get(Environment.HADOOP_CLASSPATH.name());
|
||||
initialClasspathFlag.set(true);
|
||||
return initialClasspath;
|
||||
}
|
||||
@ -753,14 +755,21 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
|
||||
}
|
||||
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
Environment.CLASSPATH.name(),
|
||||
environment,
|
||||
Environment.CLASSPATH.name(),
|
||||
getInitialClasspath(conf), conf);
|
||||
|
||||
if (initialHadoopClasspath != null) {
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
Environment.HADOOP_CLASSPATH.name(),
|
||||
initialHadoopClasspath, conf);
|
||||
}
|
||||
|
||||
if (initialAppClasspath != null) {
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
Environment.APP_CLASSPATH.name(),
|
||||
Environment.APP_CLASSPATH.name(),
|
||||
initialAppClasspath, conf);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -231,7 +231,7 @@ private static void setMRFrameworkClasspath(
|
||||
}
|
||||
// TODO: Remove duplicates.
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static void setClasspath(Map<String, String> environment,
|
||||
Configuration conf) throws IOException {
|
||||
@ -242,11 +242,30 @@ public static void setClasspath(Map<String, String> environment,
|
||||
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
|
||||
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
|
||||
|
||||
String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name();
|
||||
|
||||
MRApps.addToEnvironment(environment,
|
||||
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
|
||||
|
||||
MRApps.addToEnvironment(environment,
|
||||
hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD),
|
||||
conf);
|
||||
|
||||
if (!userClassesTakesPrecedence) {
|
||||
MRApps.setMRFrameworkClasspath(environment, conf);
|
||||
}
|
||||
|
||||
addClasspathToEnv(environment, classpathEnvVar, conf);
|
||||
addClasspathToEnv(environment, hadoopClasspathEnvVar, conf);
|
||||
|
||||
if (userClassesTakesPrecedence) {
|
||||
MRApps.setMRFrameworkClasspath(environment, conf);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static void addClasspathToEnv(Map<String, String> environment,
|
||||
String classpathEnvVar, Configuration conf) throws IOException {
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
classpathEnvVar,
|
||||
@ -254,15 +273,21 @@ public static void setClasspath(Map<String, String> environment,
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
classpathEnvVar,
|
||||
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
|
||||
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
|
||||
conf);
|
||||
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
classpathEnvVar,
|
||||
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
|
||||
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
|
||||
conf);
|
||||
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
classpathEnvVar,
|
||||
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
|
||||
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*",
|
||||
conf);
|
||||
|
||||
// a * in the classpath will only find a .jar, so we need to filter out
|
||||
// all .jars and add everything else
|
||||
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
|
||||
@ -273,11 +298,8 @@ public static void setClasspath(Map<String, String> environment,
|
||||
DistributedCache.getCacheArchives(conf),
|
||||
conf,
|
||||
environment, classpathEnvVar);
|
||||
if (userClassesTakesPrecedence) {
|
||||
MRApps.setMRFrameworkClasspath(environment, conf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add the paths to the classpath if they are not jars
|
||||
* @param paths the paths to add to the classpath
|
||||
@ -443,6 +465,7 @@ public static Path getStartJobCommitFile(Configuration conf, String user,
|
||||
return startCommitFile;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static void setupDistributedCache(
|
||||
Configuration conf,
|
||||
Map<String, LocalResource> localResources)
|
||||
@ -475,6 +498,7 @@ public static void setupDistributedCache(
|
||||
* @param conf
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public static void setupDistributedCacheLocal(Configuration conf)
|
||||
throws IOException {
|
||||
|
||||
@ -542,6 +566,7 @@ private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
|
||||
// TODO - Move this to MR!
|
||||
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
||||
// long[], boolean[], Path[], FileType)
|
||||
@SuppressWarnings("deprecation")
|
||||
private static void parseDistributedCacheArtifacts(
|
||||
Configuration conf,
|
||||
Map<String, LocalResource> localResources,
|
||||
|
@ -36,6 +36,8 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
@ -66,7 +68,10 @@
|
||||
|
||||
public class TestMRApps {
|
||||
private static File testWorkDir = null;
|
||||
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestMRApps.class);
|
||||
|
||||
@BeforeClass
|
||||
public static void setupTestDirs() throws IOException {
|
||||
testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
|
||||
@ -74,14 +79,14 @@ public static void setupTestDirs() throws IOException {
|
||||
testWorkDir.mkdirs();
|
||||
testWorkDir = testWorkDir.getAbsoluteFile();
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void cleanupTestDirs() throws IOException {
|
||||
if (testWorkDir != null) {
|
||||
delete(testWorkDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void delete(File dir) throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
@ -193,9 +198,9 @@ public void testSetClasspath() throws IOException {
|
||||
Job job = Job.getInstance(conf);
|
||||
Map<String, String> environment = new HashMap<String, String>();
|
||||
MRApps.setClasspath(environment, job.getConfiguration());
|
||||
assertTrue(environment.get("CLASSPATH").startsWith(
|
||||
ApplicationConstants.Environment.PWD.$$()
|
||||
+ ApplicationConstants.CLASS_PATH_SEPARATOR));
|
||||
assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.
|
||||
name()).startsWith(ApplicationConstants.Environment.PWD.$$()
|
||||
+ ApplicationConstants.CLASS_PATH_SEPARATOR));
|
||||
String yarnAppClasspath = job.getConfiguration().get(
|
||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||
StringUtils.join(",",
|
||||
@ -205,8 +210,9 @@ public void testSetClasspath() throws IOException {
|
||||
yarnAppClasspath.replaceAll(",\\s*",
|
||||
ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
|
||||
}
|
||||
assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
|
||||
String mrAppClasspath =
|
||||
assertTrue(environment.get(ApplicationConstants.Environment.
|
||||
CLASSPATH.name()).contains(yarnAppClasspath));
|
||||
String mrAppClasspath =
|
||||
job.getConfiguration().get(
|
||||
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
|
||||
MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH);
|
||||
@ -215,7 +221,8 @@ public void testSetClasspath() throws IOException {
|
||||
mrAppClasspath.replaceAll(",\\s*",
|
||||
ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
|
||||
}
|
||||
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
|
||||
assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.
|
||||
name()).contains(mrAppClasspath));
|
||||
}
|
||||
|
||||
@Test (timeout = 120000)
|
||||
@ -234,8 +241,12 @@ public void testSetClasspathWithArchives () throws IOException {
|
||||
conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
|
||||
Map<String, String> environment = new HashMap<String, String>();
|
||||
MRApps.setClasspath(environment, conf);
|
||||
assertTrue(environment.get("CLASSPATH").startsWith(
|
||||
assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.name()).startsWith(
|
||||
ApplicationConstants.Environment.PWD.$$() + ApplicationConstants.CLASS_PATH_SEPARATOR));
|
||||
assertTrue(environment.get(ApplicationConstants.Environment.
|
||||
HADOOP_CLASSPATH.name()).startsWith(
|
||||
ApplicationConstants.Environment.PWD.$$() +
|
||||
ApplicationConstants.CLASS_PATH_SEPARATOR));
|
||||
String confClasspath = job.getConfiguration().get(
|
||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||
StringUtils.join(",",
|
||||
@ -244,8 +255,19 @@ public void testSetClasspathWithArchives () throws IOException {
|
||||
confClasspath = confClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR)
|
||||
.trim();
|
||||
}
|
||||
assertTrue(environment.get("CLASSPATH").contains(confClasspath));
|
||||
assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
|
||||
LOG.info("CLASSPATH: "+ environment.get(
|
||||
ApplicationConstants.Environment.CLASSPATH.name()));
|
||||
LOG.info("confClasspath: " + confClasspath);
|
||||
assertTrue(environment.get(
|
||||
ApplicationConstants.Environment.CLASSPATH.name()).contains(
|
||||
confClasspath));
|
||||
LOG.info("HADOOP_CLASSPATH: " + environment.get(
|
||||
ApplicationConstants.Environment.HADOOP_CLASSPATH.name()));
|
||||
assertTrue(environment.get(
|
||||
ApplicationConstants.Environment.CLASSPATH.name()).contains("testTGZ"));
|
||||
assertTrue(environment.get(
|
||||
ApplicationConstants.Environment.HADOOP_CLASSPATH.name()).
|
||||
contains("testTGZ"));
|
||||
}
|
||||
|
||||
@Test (timeout = 120000)
|
||||
@ -259,7 +281,7 @@ public void testSetClasspathWithUserPrecendence() {
|
||||
} catch (Exception e) {
|
||||
fail("Got exception while setting classpath");
|
||||
}
|
||||
String env_str = env.get("CLASSPATH");
|
||||
String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
|
||||
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
|
||||
"job.jar/classes/", "job.jar/lib/*",
|
||||
@ -279,7 +301,7 @@ public void testSetClasspathWithNoUserPrecendence() {
|
||||
} catch (Exception e) {
|
||||
fail("Got exception while setting classpath");
|
||||
}
|
||||
String env_str = env.get("CLASSPATH");
|
||||
String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
|
||||
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
|
||||
ApplicationConstants.Environment.PWD.$$() + "/*"));
|
||||
@ -296,8 +318,9 @@ public void testSetClasspathWithJobClassloader() throws IOException {
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
|
||||
Map<String, String> env = new HashMap<String, String>();
|
||||
MRApps.setClasspath(env, conf);
|
||||
String cp = env.get("CLASSPATH");
|
||||
String appCp = env.get("APP_CLASSPATH");
|
||||
String cp = env.get(ApplicationConstants.Environment.CLASSPATH.name());
|
||||
String appCp = env.get(ApplicationConstants.Environment.
|
||||
APP_CLASSPATH.name());
|
||||
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
|
||||
+ " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job"));
|
||||
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
|
||||
@ -338,7 +361,8 @@ public void testSetClasspathWithFramework() throws IOException {
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
|
||||
FRAMEWORK_CLASSPATH, stdClasspath));
|
||||
assertEquals("Incorrect classpath with framework and no user precedence",
|
||||
expectedClasspath, env.get("CLASSPATH"));
|
||||
expectedClasspath, env.get(ApplicationConstants.Environment.
|
||||
CLASSPATH.name()));
|
||||
|
||||
env.clear();
|
||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
|
||||
@ -347,7 +371,8 @@ public void testSetClasspathWithFramework() throws IOException {
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
|
||||
stdClasspath, FRAMEWORK_CLASSPATH));
|
||||
assertEquals("Incorrect classpath with framework and user precedence",
|
||||
expectedClasspath, env.get("CLASSPATH"));
|
||||
expectedClasspath, env.get(ApplicationConstants.Environment.CLASSPATH.
|
||||
name()));
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
@ -358,7 +383,7 @@ public void testSetupDistributedCacheEmpty() throws IOException {
|
||||
assertTrue("Empty Config did not produce an empty list of resources",
|
||||
localResources.isEmpty());
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testSetupDistributedCacheConflicts() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
@ -159,6 +159,11 @@ public enum Environment {
|
||||
*/
|
||||
APP_CLASSPATH("APP_CLASSPATH"),
|
||||
|
||||
/**
|
||||
* $HADOOP_CLASSPATH.
|
||||
*/
|
||||
HADOOP_CLASSPATH("HADOOP_CLASSPATH"),
|
||||
|
||||
/**
|
||||
* $LD_LIBRARY_PATH
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user