From 9da81a395584cb1dbfdee012222fd4de53522681 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 20 Aug 2015 17:35:24 -0700 Subject: [PATCH] MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed cache files so that child processes running hadoop scripts can access these files. Contributed by Junping Du. --- hadoop-mapreduce-project/CHANGES.txt | 4 ++ .../v2/app/job/impl/TaskAttemptImpl.java | 15 ++++- .../hadoop/mapreduce/v2/util/MRApps.java | 41 +++++++++--- .../hadoop/mapreduce/v2/util/TestMRApps.java | 63 +++++++++++++------ .../hadoop/yarn/api/ApplicationConstants.java | 5 ++ 5 files changed, 98 insertions(+), 30 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1a9bdf12cb0..ba5a6f1c565 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -557,6 +557,10 @@ Release 2.6.2 - UNRELEASED BUG FIXES + MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed + cache files so that child processes running hadoop scripts can access these + files. (Junping Du via vinodkv) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 0261b7f607b..f32c23a2372 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -173,6 +173,7 @@ public abstract class TaskAttemptImpl implements private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); private static String initialClasspath = null; private static String initialAppClasspath = null; + private static String initialHadoopClasspath = null; private static Object commonContainerSpecLock = new Object(); private static ContainerLaunchContext commonContainerSpec = null; private static final Object classpathLock = new Object(); @@ -742,6 +743,7 @@ private static String getInitialClasspath(Configuration conf) throws IOException MRApps.setClasspath(env, conf); initialClasspath = env.get(Environment.CLASSPATH.name()); initialAppClasspath = env.get(Environment.APP_CLASSPATH.name()); + initialHadoopClasspath = env.get(Environment.HADOOP_CLASSPATH.name()); initialClasspathFlag.set(true); return initialClasspath; } @@ -875,14 +877,21 @@ private static ContainerLaunchContext createCommonContainerLaunchContext( } MRApps.addToEnvironment( - environment, - Environment.CLASSPATH.name(), + environment, + Environment.CLASSPATH.name(), getInitialClasspath(conf), conf); + if (initialHadoopClasspath != null) { + MRApps.addToEnvironment( + environment, + Environment.HADOOP_CLASSPATH.name(), + initialHadoopClasspath, conf); + } + if (initialAppClasspath != null) { MRApps.addToEnvironment( environment, - Environment.APP_CLASSPATH.name(), + Environment.APP_CLASSPATH.name(), initialAppClasspath, conf); } } catch (IOException e) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 40ba613c52e..c6454654ee0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -234,7 +234,7 @@ private static void setMRFrameworkClasspath( } // TODO: Remove duplicates. } - + @SuppressWarnings("deprecation") public static void setClasspath(Map environment, Configuration conf) throws IOException { @@ -245,11 +245,30 @@ public static void setClasspath(Map environment, conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false) ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name(); + String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name(); + MRApps.addToEnvironment(environment, classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf); + + MRApps.addToEnvironment(environment, + hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), + conf); + if (!userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); } + + addClasspathToEnv(environment, classpathEnvVar, conf); + addClasspathToEnv(environment, hadoopClasspathEnvVar, conf); + + if (userClassesTakesPrecedence) { + MRApps.setMRFrameworkClasspath(environment, conf); + } + } + + @SuppressWarnings("deprecation") + public static void addClasspathToEnv(Map environment, + String classpathEnvVar, Configuration conf) throws IOException { MRApps.addToEnvironment( environment, classpathEnvVar, @@ -257,15 +276,21 @@ public static void setClasspath(Map environment, MRApps.addToEnvironment( environment, classpathEnvVar, - MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf); + MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, + conf); + MRApps.addToEnvironment( environment, classpathEnvVar, - MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf); + MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", + conf); + MRApps.addToEnvironment( environment, classpathEnvVar, - crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf); + crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", + conf); + // a * in the classpath will only find a .jar, so we need to filter out // all .jars and add everything else addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), @@ -276,11 +301,8 @@ public static void setClasspath(Map environment, DistributedCache.getCacheArchives(conf), conf, environment, classpathEnvVar); - if (userClassesTakesPrecedence) { - MRApps.setMRFrameworkClasspath(environment, conf); - } } - + /** * Add the paths to the classpath if they are not jars * @param paths the paths to add to the classpath @@ -446,6 +468,7 @@ public static Path getStartJobCommitFile(Configuration conf, String user, return startCommitFile; } + @SuppressWarnings("deprecation") public static void setupDistributedCache( Configuration conf, Map localResources) @@ -478,6 +501,7 @@ public static void setupDistributedCache( * @param conf * @throws java.io.IOException */ + @SuppressWarnings("deprecation") public static void setupDistributedCacheLocal(Configuration conf) throws IOException { @@ -545,6 +569,7 @@ private static String toString(org.apache.hadoop.yarn.api.records.URL url) { // TODO - Move this to MR! // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], // long[], boolean[], Path[], FileType) + @SuppressWarnings("deprecation") private static void parseDistributedCacheArtifacts( Configuration conf, Map localResources, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index b233c841d88..3a417a0485e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -36,6 +36,8 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; @@ -66,7 +68,10 @@ public class TestMRApps { private static File testWorkDir = null; - + + private static final Log LOG = + LogFactory.getLog(TestMRApps.class); + @BeforeClass public static void setupTestDirs() throws IOException { testWorkDir = new File("target", TestMRApps.class.getCanonicalName()); @@ -74,14 +79,14 @@ public static void setupTestDirs() throws IOException { testWorkDir.mkdirs(); testWorkDir = testWorkDir.getAbsoluteFile(); } - + @AfterClass public static void cleanupTestDirs() throws IOException { if (testWorkDir != null) { delete(testWorkDir); } } - + private static void delete(File dir) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); @@ -193,9 +198,9 @@ public void testSetClasspath() throws IOException { Job job = Job.getInstance(conf); Map environment = new HashMap(); MRApps.setClasspath(environment, job.getConfiguration()); - assertTrue(environment.get("CLASSPATH").startsWith( - ApplicationConstants.Environment.PWD.$$() - + ApplicationConstants.CLASS_PATH_SEPARATOR)); + assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH. + name()).startsWith(ApplicationConstants.Environment.PWD.$$() + + ApplicationConstants.CLASS_PATH_SEPARATOR)); String yarnAppClasspath = job.getConfiguration().get( YarnConfiguration.YARN_APPLICATION_CLASSPATH, StringUtils.join(",", @@ -205,8 +210,9 @@ public void testSetClasspath() throws IOException { yarnAppClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR).trim(); } - assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath)); - String mrAppClasspath = + assertTrue(environment.get(ApplicationConstants.Environment. + CLASSPATH.name()).contains(yarnAppClasspath)); + String mrAppClasspath = job.getConfiguration().get( MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH); @@ -215,7 +221,8 @@ public void testSetClasspath() throws IOException { mrAppClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR).trim(); } - assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath)); + assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH. + name()).contains(mrAppClasspath)); } @Test (timeout = 120000) @@ -234,8 +241,12 @@ public void testSetClasspathWithArchives () throws IOException { conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ"); Map environment = new HashMap(); MRApps.setClasspath(environment, conf); - assertTrue(environment.get("CLASSPATH").startsWith( + assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.name()).startsWith( ApplicationConstants.Environment.PWD.$$() + ApplicationConstants.CLASS_PATH_SEPARATOR)); + assertTrue(environment.get(ApplicationConstants.Environment. + HADOOP_CLASSPATH.name()).startsWith( + ApplicationConstants.Environment.PWD.$$() + + ApplicationConstants.CLASS_PATH_SEPARATOR)); String confClasspath = job.getConfiguration().get( YarnConfiguration.YARN_APPLICATION_CLASSPATH, StringUtils.join(",", @@ -244,8 +255,19 @@ public void testSetClasspathWithArchives () throws IOException { confClasspath = confClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR) .trim(); } - assertTrue(environment.get("CLASSPATH").contains(confClasspath)); - assertTrue(environment.get("CLASSPATH").contains("testTGZ")); + LOG.info("CLASSPATH: "+ environment.get( + ApplicationConstants.Environment.CLASSPATH.name())); + LOG.info("confClasspath: " + confClasspath); + assertTrue(environment.get( + ApplicationConstants.Environment.CLASSPATH.name()).contains( + confClasspath)); + LOG.info("HADOOP_CLASSPATH: " + environment.get( + ApplicationConstants.Environment.HADOOP_CLASSPATH.name())); + assertTrue(environment.get( + ApplicationConstants.Environment.CLASSPATH.name()).contains("testTGZ")); + assertTrue(environment.get( + ApplicationConstants.Environment.HADOOP_CLASSPATH.name()). + contains("testTGZ")); } @Test (timeout = 120000) @@ -259,7 +281,7 @@ public void testSetClasspathWithUserPrecendence() { } catch (Exception e) { fail("Got exception while setting classpath"); } - String env_str = env.get("CLASSPATH"); + String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name()); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", @@ -279,7 +301,7 @@ public void testSetClasspathWithNoUserPrecendence() { } catch (Exception e) { fail("Got exception while setting classpath"); } - String env_str = env.get("CLASSPATH"); + String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name()); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); @@ -296,8 +318,9 @@ public void testSetClasspathWithJobClassloader() throws IOException { conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); Map env = new HashMap(); MRApps.setClasspath(env, conf); - String cp = env.get("CLASSPATH"); - String appCp = env.get("APP_CLASSPATH"); + String cp = env.get(ApplicationConstants.Environment.CLASSPATH.name()); + String appCp = env.get(ApplicationConstants.Environment. + APP_CLASSPATH.name()); assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the" + " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job")); assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!", @@ -338,7 +361,8 @@ public void testSetClasspathWithFramework() throws IOException { Arrays.asList(ApplicationConstants.Environment.PWD.$$(), FRAMEWORK_CLASSPATH, stdClasspath)); assertEquals("Incorrect classpath with framework and no user precedence", - expectedClasspath, env.get("CLASSPATH")); + expectedClasspath, env.get(ApplicationConstants.Environment. + CLASSPATH.name())); env.clear(); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); @@ -347,7 +371,8 @@ public void testSetClasspathWithFramework() throws IOException { Arrays.asList(ApplicationConstants.Environment.PWD.$$(), stdClasspath, FRAMEWORK_CLASSPATH)); assertEquals("Incorrect classpath with framework and user precedence", - expectedClasspath, env.get("CLASSPATH")); + expectedClasspath, env.get(ApplicationConstants.Environment.CLASSPATH. + name())); } @Test (timeout = 30000) @@ -358,7 +383,7 @@ public void testSetupDistributedCacheEmpty() throws IOException { assertTrue("Empty Config did not produce an empty list of resources", localResources.isEmpty()); } - + @SuppressWarnings("deprecation") public void testSetupDistributedCacheConflicts() throws Exception { Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java index 42464da1a43..b2d765ae308 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java @@ -160,6 +160,11 @@ public enum Environment { */ APP_CLASSPATH("APP_CLASSPATH"), + /** + * $HADOOP_CLASSPATH. + */ + HADOOP_CLASSPATH("HADOOP_CLASSPATH"), + /** * $LD_LIBRARY_PATH */