MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed cache files so that child processes running hadoop scripts can access these files. Contributed by Junping Du.

(cherry picked from commit 9da81a3955)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-08-20 17:35:24 -07:00
parent 9faee040f9
commit f333e367e1
5 changed files with 98 additions and 30 deletions

View File

@ -271,6 +271,10 @@ Release 2.6.2 - UNRELEASED
BUG FIXES BUG FIXES
MAPREDUCE-6454. Fixed MapReduce to modify HADOOP_CLASSPATH to have distributed
cache files so that child processes running hadoop scripts can access these
files. (Junping Du via vinodkv)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -172,6 +172,7 @@ public abstract class TaskAttemptImpl implements
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null; private static String initialClasspath = null;
private static String initialAppClasspath = null; private static String initialAppClasspath = null;
private static String initialHadoopClasspath = null;
private static Object commonContainerSpecLock = new Object(); private static Object commonContainerSpecLock = new Object();
private static ContainerLaunchContext commonContainerSpec = null; private static ContainerLaunchContext commonContainerSpec = null;
private static final Object classpathLock = new Object(); private static final Object classpathLock = new Object();
@ -620,6 +621,7 @@ public abstract class TaskAttemptImpl implements
MRApps.setClasspath(env, conf); MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name()); initialClasspath = env.get(Environment.CLASSPATH.name());
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name()); initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialHadoopClasspath = env.get(Environment.HADOOP_CLASSPATH.name());
initialClasspathFlag.set(true); initialClasspathFlag.set(true);
return initialClasspath; return initialClasspath;
} }
@ -753,14 +755,21 @@ public abstract class TaskAttemptImpl implements
} }
MRApps.addToEnvironment( MRApps.addToEnvironment(
environment, environment,
Environment.CLASSPATH.name(), Environment.CLASSPATH.name(),
getInitialClasspath(conf), conf); getInitialClasspath(conf), conf);
if (initialHadoopClasspath != null) {
MRApps.addToEnvironment(
environment,
Environment.HADOOP_CLASSPATH.name(),
initialHadoopClasspath, conf);
}
if (initialAppClasspath != null) { if (initialAppClasspath != null) {
MRApps.addToEnvironment( MRApps.addToEnvironment(
environment, environment,
Environment.APP_CLASSPATH.name(), Environment.APP_CLASSPATH.name(),
initialAppClasspath, conf); initialAppClasspath, conf);
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -234,7 +234,7 @@ public class MRApps extends Apps {
} }
// TODO: Remove duplicates. // TODO: Remove duplicates.
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public static void setClasspath(Map<String, String> environment, public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException { Configuration conf) throws IOException {
@ -245,11 +245,30 @@ public class MRApps extends Apps {
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false) conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name(); ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name();
MRApps.addToEnvironment(environment, MRApps.addToEnvironment(environment,
classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf); classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
MRApps.addToEnvironment(environment,
hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD),
conf);
if (!userClassesTakesPrecedence) { if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf); MRApps.setMRFrameworkClasspath(environment, conf);
} }
addClasspathToEnv(environment, classpathEnvVar, conf);
addClasspathToEnv(environment, hadoopClasspathEnvVar, conf);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
}
@SuppressWarnings("deprecation")
public static void addClasspathToEnv(Map<String, String> environment,
String classpathEnvVar, Configuration conf) throws IOException {
MRApps.addToEnvironment( MRApps.addToEnvironment(
environment, environment,
classpathEnvVar, classpathEnvVar,
@ -257,15 +276,21 @@ public class MRApps extends Apps {
MRApps.addToEnvironment( MRApps.addToEnvironment(
environment, environment,
classpathEnvVar, classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf); MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
conf);
MRApps.addToEnvironment( MRApps.addToEnvironment(
environment, environment,
classpathEnvVar, classpathEnvVar,
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf); MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
conf);
MRApps.addToEnvironment( MRApps.addToEnvironment(
environment, environment,
classpathEnvVar, classpathEnvVar,
crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf); crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*",
conf);
// a * in the classpath will only find a .jar, so we need to filter out // a * in the classpath will only find a .jar, so we need to filter out
// all .jars and add everything else // all .jars and add everything else
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
@ -276,11 +301,8 @@ public class MRApps extends Apps {
DistributedCache.getCacheArchives(conf), DistributedCache.getCacheArchives(conf),
conf, conf,
environment, classpathEnvVar); environment, classpathEnvVar);
if (userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf);
}
} }
/** /**
* Add the paths to the classpath if they are not jars * Add the paths to the classpath if they are not jars
* @param paths the paths to add to the classpath * @param paths the paths to add to the classpath
@ -446,6 +468,7 @@ public class MRApps extends Apps {
return startCommitFile; return startCommitFile;
} }
@SuppressWarnings("deprecation")
public static void setupDistributedCache( public static void setupDistributedCache(
Configuration conf, Configuration conf,
Map<String, LocalResource> localResources) Map<String, LocalResource> localResources)
@ -478,6 +501,7 @@ public class MRApps extends Apps {
* @param conf * @param conf
* @throws java.io.IOException * @throws java.io.IOException
*/ */
@SuppressWarnings("deprecation")
public static void setupDistributedCacheLocal(Configuration conf) public static void setupDistributedCacheLocal(Configuration conf)
throws IOException { throws IOException {
@ -545,6 +569,7 @@ public class MRApps extends Apps {
// TODO - Move this to MR! // TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType) // long[], boolean[], Path[], FileType)
@SuppressWarnings("deprecation")
private static void parseDistributedCacheArtifacts( private static void parseDistributedCacheArtifacts(
Configuration conf, Configuration conf,
Map<String, LocalResource> localResources, Map<String, LocalResource> localResources,

View File

@ -36,6 +36,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FilterFileSystem;
@ -66,7 +68,10 @@ import org.junit.Test;
public class TestMRApps { public class TestMRApps {
private static File testWorkDir = null; private static File testWorkDir = null;
private static final Log LOG =
LogFactory.getLog(TestMRApps.class);
@BeforeClass @BeforeClass
public static void setupTestDirs() throws IOException { public static void setupTestDirs() throws IOException {
testWorkDir = new File("target", TestMRApps.class.getCanonicalName()); testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
@ -74,14 +79,14 @@ public class TestMRApps {
testWorkDir.mkdirs(); testWorkDir.mkdirs();
testWorkDir = testWorkDir.getAbsoluteFile(); testWorkDir = testWorkDir.getAbsoluteFile();
} }
@AfterClass @AfterClass
public static void cleanupTestDirs() throws IOException { public static void cleanupTestDirs() throws IOException {
if (testWorkDir != null) { if (testWorkDir != null) {
delete(testWorkDir); delete(testWorkDir);
} }
} }
private static void delete(File dir) throws IOException { private static void delete(File dir) throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf); FileSystem fs = FileSystem.getLocal(conf);
@ -193,9 +198,9 @@ public class TestMRApps {
Job job = Job.getInstance(conf); Job job = Job.getInstance(conf);
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration()); MRApps.setClasspath(environment, job.getConfiguration());
assertTrue(environment.get("CLASSPATH").startsWith( assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.
ApplicationConstants.Environment.PWD.$$() name()).startsWith(ApplicationConstants.Environment.PWD.$$()
+ ApplicationConstants.CLASS_PATH_SEPARATOR)); + ApplicationConstants.CLASS_PATH_SEPARATOR));
String yarnAppClasspath = job.getConfiguration().get( String yarnAppClasspath = job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",", StringUtils.join(",",
@ -205,8 +210,9 @@ public class TestMRApps {
yarnAppClasspath.replaceAll(",\\s*", yarnAppClasspath.replaceAll(",\\s*",
ApplicationConstants.CLASS_PATH_SEPARATOR).trim(); ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
} }
assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath)); assertTrue(environment.get(ApplicationConstants.Environment.
String mrAppClasspath = CLASSPATH.name()).contains(yarnAppClasspath));
String mrAppClasspath =
job.getConfiguration().get( job.getConfiguration().get(
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH); MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH);
@ -215,7 +221,8 @@ public class TestMRApps {
mrAppClasspath.replaceAll(",\\s*", mrAppClasspath.replaceAll(",\\s*",
ApplicationConstants.CLASS_PATH_SEPARATOR).trim(); ApplicationConstants.CLASS_PATH_SEPARATOR).trim();
} }
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath)); assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.
name()).contains(mrAppClasspath));
} }
@Test (timeout = 120000) @Test (timeout = 120000)
@ -234,8 +241,12 @@ public class TestMRApps {
conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ"); conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf); MRApps.setClasspath(environment, conf);
assertTrue(environment.get("CLASSPATH").startsWith( assertTrue(environment.get(ApplicationConstants.Environment.CLASSPATH.name()).startsWith(
ApplicationConstants.Environment.PWD.$$() + ApplicationConstants.CLASS_PATH_SEPARATOR)); ApplicationConstants.Environment.PWD.$$() + ApplicationConstants.CLASS_PATH_SEPARATOR));
assertTrue(environment.get(ApplicationConstants.Environment.
HADOOP_CLASSPATH.name()).startsWith(
ApplicationConstants.Environment.PWD.$$() +
ApplicationConstants.CLASS_PATH_SEPARATOR));
String confClasspath = job.getConfiguration().get( String confClasspath = job.getConfiguration().get(
YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",", StringUtils.join(",",
@ -244,8 +255,19 @@ public class TestMRApps {
confClasspath = confClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR) confClasspath = confClasspath.replaceAll(",\\s*", ApplicationConstants.CLASS_PATH_SEPARATOR)
.trim(); .trim();
} }
assertTrue(environment.get("CLASSPATH").contains(confClasspath)); LOG.info("CLASSPATH: "+ environment.get(
assertTrue(environment.get("CLASSPATH").contains("testTGZ")); ApplicationConstants.Environment.CLASSPATH.name()));
LOG.info("confClasspath: " + confClasspath);
assertTrue(environment.get(
ApplicationConstants.Environment.CLASSPATH.name()).contains(
confClasspath));
LOG.info("HADOOP_CLASSPATH: " + environment.get(
ApplicationConstants.Environment.HADOOP_CLASSPATH.name()));
assertTrue(environment.get(
ApplicationConstants.Environment.CLASSPATH.name()).contains("testTGZ"));
assertTrue(environment.get(
ApplicationConstants.Environment.HADOOP_CLASSPATH.name()).
contains("testTGZ"));
} }
@Test (timeout = 120000) @Test (timeout = 120000)
@ -259,7 +281,7 @@ public class TestMRApps {
} catch (Exception e) { } catch (Exception e) {
fail("Got exception while setting classpath"); fail("Got exception while setting classpath");
} }
String env_str = env.get("CLASSPATH"); String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar", Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*", "job.jar/classes/", "job.jar/lib/*",
@ -279,7 +301,7 @@ public class TestMRApps {
} catch (Exception e) { } catch (Exception e) {
fail("Got exception while setting classpath"); fail("Got exception while setting classpath");
} }
String env_str = env.get("CLASSPATH"); String env_str = env.get(ApplicationConstants.Environment.CLASSPATH.name());
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*")); ApplicationConstants.Environment.PWD.$$() + "/*"));
@ -296,8 +318,9 @@ public class TestMRApps {
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
Map<String, String> env = new HashMap<String, String>(); Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf); MRApps.setClasspath(env, conf);
String cp = env.get("CLASSPATH"); String cp = env.get(ApplicationConstants.Environment.CLASSPATH.name());
String appCp = env.get("APP_CLASSPATH"); String appCp = env.get(ApplicationConstants.Environment.
APP_CLASSPATH.name());
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the" assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+ " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job")); + " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job"));
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!", assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
@ -338,7 +361,8 @@ public class TestMRApps {
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
FRAMEWORK_CLASSPATH, stdClasspath)); FRAMEWORK_CLASSPATH, stdClasspath));
assertEquals("Incorrect classpath with framework and no user precedence", assertEquals("Incorrect classpath with framework and no user precedence",
expectedClasspath, env.get("CLASSPATH")); expectedClasspath, env.get(ApplicationConstants.Environment.
CLASSPATH.name()));
env.clear(); env.clear();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
@ -347,7 +371,8 @@ public class TestMRApps {
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
stdClasspath, FRAMEWORK_CLASSPATH)); stdClasspath, FRAMEWORK_CLASSPATH));
assertEquals("Incorrect classpath with framework and user precedence", assertEquals("Incorrect classpath with framework and user precedence",
expectedClasspath, env.get("CLASSPATH")); expectedClasspath, env.get(ApplicationConstants.Environment.CLASSPATH.
name()));
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -358,7 +383,7 @@ public class TestMRApps {
assertTrue("Empty Config did not produce an empty list of resources", assertTrue("Empty Config did not produce an empty list of resources",
localResources.isEmpty()); localResources.isEmpty());
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception { public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -160,6 +160,11 @@ public interface ApplicationConstants {
*/ */
APP_CLASSPATH("APP_CLASSPATH"), APP_CLASSPATH("APP_CLASSPATH"),
/**
* $HADOOP_CLASSPATH.
*/
HADOOP_CLASSPATH("HADOOP_CLASSPATH"),
/** /**
* $LD_LIBRARY_PATH * $LD_LIBRARY_PATH
*/ */