MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed cache files so that child processes running hadoop scripts can access these files. Contributed by Junping Du.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-08-20 17:35:24 -07:00
parent 45be1240d9
commit 9da81a3955
5 changed files with 98 additions and 30 deletions

View File

@ -557,6 +557,10 @@ Release 2.6.2 - UNRELEASED
BUG FIXES
MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed
cache files so that child processes running hadoop scripts can access these
files. (Junping Du via vinodkv)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -173,6 +173,7 @@ public abstract class TaskAttemptImpl implements
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null;
private static String initialAppClasspath = null;
private static String initialHadoopClasspath = null;
private static Object commonContainerSpecLock = new Object();
private static ContainerLaunchContext commonContainerSpec = null;
private static final Object classpathLock = new Object();
@ -742,6 +743,7 @@ private static String getInitialClasspath(Configuration conf) throws IOException
MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialHadoopClasspath = env.get(Environment.HADOOP_CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
@ -875,14 +877,21 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
}
MRApps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
environment,
Environment.CLASSPATH.name(),
getInitialClasspath(conf), conf);
if (initialHadoopClasspath != null) {
MRApps.addToEnvironment(
environment,
Environment.HADOOP_CLASSPATH.name(),
initialHadoopClasspath, conf);
}
if (initialAppClasspath != null) {
MRApps.addToEnvironment(
environment,
Environment.APP_CLASSPATH.name(),
Environment.APP_CLASSPATH.name(),
initialAppClasspath, conf);
}
} catch (IOException e) {

View File

@ -234,7 +234,7 @@ private static void setMRFrameworkClasspath(
}
// TODO: Remove duplicates.
}
@SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
@ -245,11 +245,30 @@ public static void setClasspath(Map<String, String> environment,
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name();
MRApps.addToEnvironment(environment,
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
MRApps.addToEnvironment(environment,
hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD),
conf);
if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
addClasspathToEnv(environment, classpathEnvVar, conf);
addClasspathToEnv(environment, hadoopClasspathEnvVar, conf);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
@SuppressWarnings("deprecation")
public static void addClasspathToEnv(Map<String, String> environment,
String classpathEnvVar, Configuration conf) throws IOException {
MRApps.addToEnvironment(
environment,
classpathEnvVar,
@ -257,15 +276,21 @@ public static void setClasspath(Map<String, String> environment,
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
conf);
MRApps.addToEnvironment(
environment,
classpathEnvVar,
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*",
conf);
// a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
@ -276,11 +301,8 @@ public static void setClasspath(Map<String, String> environment,
DistributedCache.getCacheArchives(conf),
conf,
environment, classpathEnvVar);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
/**
* Add the paths to the classpath if they are not jars
* @param paths the paths to add to the classpath
@ -446,6 +468,7 @@ public static Path getStartJobCommitFile(Configuration conf, String user,
return startCommitFile;
}
@SuppressWarnings("deprecation")
public static void setupDistributedCache(
Configuration conf,
Map<String, LocalResource> localResources)
@ -478,6 +501,7 @@ public static void setupDistributedCache(
* @param conf
* @throws java.io.IOException
*/
@SuppressWarnings("deprecation")
public static void setupDistributedCacheLocal(Configuration conf)
throws IOException {
@ -545,6 +569,7 @@ private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType)
@SuppressWarnings("deprecation")
private static void parseDistributedCacheArtifacts(
Configuration conf,
Map<String, LocalResource> localResources,

View File

@ -36,6 +36,8 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
@ -66,7 +68,10 @@
public class TestMRApps {
private static File testWorkDir = null;
private static final Log LOG =
LogFactory.getLog(TestMRApps.class);
@BeforeClass
public static void setupTestDirs() throws IOException {
testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
@ -74,14 +79,14 @@ public static void setupTestDirs() throws IOException {
testWorkDir.mkdirs();
testWorkDir = testWorkDir.getAbsoluteFile();
}
@AfterClass
public static void cleanupTestDirs() throws IOException {
if (testWorkDir != null) {
delete(testWorkDir);
}
}
private static void delete(File dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
@ -193,9 +198,9 @@ public void testSetClasspath() throws IOException {
Job job = Job.getInstance(conf);
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration());
assertTrue(environment.get("CLASSPATH").startsWith(
ApplicationConstants.Environment.PWD.$$()
+ ApplicationConstants.CLASS_PATH_SEPARATOR));
assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.
name()).startsWith(ApplicationConstants.Environment.PWD.$$()
+ ApplicationConstants.CLASS_PATH_SEPARATOR));
String yarnAppClasspath = job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",",
@ -205,8 +210,9 @@ public void testSetClasspath() throws IOException {
yarnAppClasspath.replaceAll(",\\s*",
ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
}
assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
String mrAppClasspath =
assertTrue(environment.get(ApplicationConstants.Environment.
CLASSPATH.name()).contains(yarnAppClasspath));
String mrAppClasspath =
job.getConfiguration().get(
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH);
@ -215,7 +221,8 @@ public void testSetClasspath() throws IOException {
mrAppClasspath.replaceAll(",\\s*",
ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
}
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.
name()).contains(mrAppClasspath));
}
@Test (timeout = 120000)
@ -234,8 +241,12 @@ public void testSetClasspathWithArchives () throws IOException {
conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);
assertTrue(environment.get("CLASSPATH").startsWith(
assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.name()).startsWith(
ApplicationConstants.Environment.PWD.$$() + ApplicationConstants.CLASS_PATH_SEPARATOR));
assertTrue(environment.get(ApplicationConstants.Environment.
HADOOP_CLASSPATH.name()).startsWith(
ApplicationConstants.Environment.PWD.$$() +
ApplicationConstants.CLASS_PATH_SEPARATOR));
String confClasspath = job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",",
@ -244,8 +255,19 @@ public void testSetClasspathWithArchives () throws IOException {
confClasspath = confClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR)
.trim();
}
assertTrue(environment.get("CLASSPATH").contains(confClasspath));
assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
LOG.info("CLASSPATH: "+ environment.get(
ApplicationConstants.Environment.CLASSPATH.name()));
LOG.info("confClasspath: " + confClasspath);
assertTrue(environment.get(
ApplicationConstants.Environment.CLASSPATH.name()).contains(
confClasspath));
LOG.info("HADOOP_CLASSPATH: " + environment.get(
ApplicationConstants.Environment.HADOOP_CLASSPATH.name()));
assertTrue(environment.get(
ApplicationConstants.Environment.CLASSPATH.name()).contains("testTGZ"));
assertTrue(environment.get(
ApplicationConstants.Environment.HADOOP_CLASSPATH.name()).
contains("testTGZ"));
}
@Test (timeout = 120000)
@ -259,7 +281,7 @@ public void testSetClasspathWithUserPrecendence() {
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
@ -279,7 +301,7 @@ public void testSetClasspathWithNoUserPrecendence() {
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
@ -296,8 +318,9 @@ public void testSetClasspathWithJobClassloader() throws IOException {
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
String cp = env.get("CLASSPATH");
String appCp = env.get("APP_CLASSPATH");
String cp = env.get(ApplicationConstants.Environment.CLASSPATH.name());
String appCp = env.get(ApplicationConstants.Environment.
APP_CLASSPATH.name());
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+ " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job"));
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
@ -338,7 +361,8 @@ public void testSetClasspathWithFramework() throws IOException {
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
FRAMEWORK_CLASSPATH, stdClasspath));
assertEquals("Incorrect classpath with framework and no user precedence",
expectedClasspath, env.get("CLASSPATH"));
expectedClasspath, env.get(ApplicationConstants.Environment.
CLASSPATH.name()));
env.clear();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
@ -347,7 +371,8 @@ public void testSetClasspathWithFramework() throws IOException {
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
stdClasspath, FRAMEWORK_CLASSPATH));
assertEquals("Incorrect classpath with framework and user precedence",
expectedClasspath, env.get("CLASSPATH"));
expectedClasspath, env.get(ApplicationConstants.Environment.CLASSPATH.
name()));
}
@Test (timeout = 30000)
@ -358,7 +383,7 @@ public void testSetupDistributedCacheEmpty() throws IOException {
assertTrue("Empty Config did not produce an empty list of resources",
localResources.isEmpty());
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration();

View File

@ -160,6 +160,11 @@ public enum Environment {
*/
APP_CLASSPATH("APP_CLASSPATH"),
/**
* $HADOOP_CLASSPATH.
*/
HADOOP_CLASSPATH("HADOOP_CLASSPATH"),
/**
* $LD_LIBRARY_PATH
*/