From dc33a0765cd27255021911c4abb435b5850387aa Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Wed, 22 Aug 2012 21:18:34 +0000 Subject: [PATCH] MAPREDUCE-4068. Jars in lib subdirectory of the submittable JAR are not added to the classpath (rkanter via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1376253 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 2 +- .../hadoop/mapreduce/v2/util/MRApps.java | 10 ++- .../hadoop/mapreduce/v2/util/TestMRApps.java | 8 +- .../org/apache/hadoop/mapred/YARNRunner.java | 11 +-- .../hadoop/mapreduce/v2/TestMRJobs.java | 75 +++++++++++++++++-- 6 files changed, 95 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4e1cecb15c0..8377c0ae006 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -173,6 +173,9 @@ Branch-2 ( Unreleased changes ) for compatibility reasons is creating incorrect counter name. (Jarek Jarcec Cecho via tomwhite) + MAPREDUCE-4068. Jars in lib subdirectory of the submittable JAR are not added to the + classpath (rkanter via tucu) + Release 2.1.0-alpha - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 25f14e58e30..f47e03a9701 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -608,7 +608,7 @@ public abstract class TaskAttemptImpl implements localResources.put( MRJobConfig.JOB_JAR, createLocalResource(remoteFS, remoteJobJar, - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-jar file on the remote FS is " + remoteJobJar.toUri().toASCIIString()); } else { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 32817380ac9..63edd026ebe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -204,7 +204,15 @@ public class MRApps extends Apps { Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), - MRJobConfig.JOB_JAR); + MRJobConfig.JOB_JAR + Path.SEPARATOR); + Apps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR); + Apps.addToEnvironment( + environment, + Environment.CLASSPATH.name(), + MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*"); Apps.addToEnvironment( environment, Environment.CLASSPATH.name(), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 8772508c992..865be737c94 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -158,7 +158,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!", - env_str.indexOf("$PWD:job.jar"), 0); + env_str.indexOf("$PWD:job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0); } @Test public void testSetClasspathWithNoUserPrecendence() { @@ -171,8 +171,12 @@ public class TestMRApps { fail("Got exception while setting classpath"); } String env_str = env.get("CLASSPATH"); + int index = + env_str.indexOf("job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*"); + assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not" + + " in the classpath!", index, -1); assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!", - env_str.indexOf("$PWD:job.jar"), 0); + index, 0); } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index e6358de35de..4555f86e888 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -304,7 +304,7 @@ public class YARNRunner implements ClientProtocol { return clientCache.getClient(jobId).getJobStatus(jobId); } - private LocalResource createApplicationResource(FileContext fs, Path p) + private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) throws IOException { LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); FileStatus rsrcStat = fs.getFileStatus(p); @@ -312,7 +312,7 @@ public class YARNRunner implements ClientProtocol { .getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); rsrc.setSize(rsrcStat.getLen()); rsrc.setTimestamp(rsrcStat.getModificationTime()); - rsrc.setType(LocalResourceType.FILE); + rsrc.setType(type); rsrc.setVisibility(LocalResourceVisibility.APPLICATION); return rsrc; } @@ -343,11 +343,12 @@ public class YARNRunner implements ClientProtocol { localResources.put(MRJobConfig.JOB_CONF_FILE, createApplicationResource(defaultFileContext, - jobConfPath)); + jobConfPath, LocalResourceType.FILE)); if (jobConf.get(MRJobConfig.JAR) != null) { localResources.put(MRJobConfig.JOB_JAR, createApplicationResource(defaultFileContext, - new Path(jobSubmitDir, MRJobConfig.JOB_JAR))); + new Path(jobSubmitDir, MRJobConfig.JOB_JAR), + LocalResourceType.ARCHIVE)); } else { // Job jar may be null. For e.g, for pipes, the job jar is the hadoop // mapreduce jar itself which is already on the classpath. @@ -363,7 +364,7 @@ public class YARNRunner implements ClientProtocol { localResources.put( MRJobConfig.JOB_SUBMIT_DIR + "/" + s, createApplicationResource(defaultFileContext, - new Path(jobSubmitDir, s))); + new Path(jobSubmitDir, s), LocalResourceType.FILE)); } // Setup security tokens diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 7b6ecd25560..03fdc4e57f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -26,6 +27,7 @@ import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,6 +68,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.AfterClass; import org.junit.Assert; @@ -402,13 +405,14 @@ public class TestMRJobs { Path[] archives = context.getLocalCacheArchives(); FileSystem fs = LocalFileSystem.get(conf); - // Check that 3(2+ appjar) files and 2 archives are present - Assert.assertEquals(3, files.length); + // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files + // and 2 archives are present + Assert.assertEquals(4, files.length); Assert.assertEquals(2, archives.length); // Check lengths of the files - Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen()); - Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1); + Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen()); + Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1); // Check extraction of the archive Assert.assertTrue(fs.exists(new Path(archives[0], @@ -424,11 +428,23 @@ public class TestMRJobs { Assert.assertNotNull(cl.getResource("distributed.jar.inside2")); Assert.assertNotNull(cl.getResource("distributed.jar.inside3")); Assert.assertNotNull(cl.getResource("distributed.jar.inside4")); + // The Job Jar should have been extracted to a folder named "job.jar" and + // added to the classpath; the two jar files in the lib folder in the Job + // Jar should have also been added to the classpath + Assert.assertNotNull(cl.getResource("job.jar/")); + Assert.assertNotNull(cl.getResource("job.jar/lib/lib1.jar")); + Assert.assertNotNull(cl.getResource("job.jar/lib/lib2.jar")); // Check that the symlink for the renaming was created in the cwd; File symlinkFile = new File("distributed.first.symlink"); Assert.assertTrue(symlinkFile.exists()); Assert.assertEquals(1, symlinkFile.length()); + + // Check that the symlink for the Job Jar was created in the cwd and + // points to the extracted directory + File jobJarDir = new File("job.jar"); + Assert.assertTrue(FileUtils.isSymlink(jobJarDir)); + Assert.assertTrue(jobJarDir.isDirectory()); } } @@ -451,7 +467,15 @@ public class TestMRJobs { makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); Job job = Job.getInstance(mrCluster.getConfig()); - job.setJarByClass(DistributedCacheChecker.class); + + // Set the job jar to a new "dummy" jar so we can check that its extracted + // properly + job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString())); + // Because the job jar is a "dummy" jar, we need to include the jar with + // DistributedCacheChecker or it won't be able to find it + job.addFileToClassPath(new Path( + JarFinder.getJar(DistributedCacheChecker.class))); + job.setMapperClass(DistributedCacheChecker.class); job.setOutputFormatClass(NullOutputFormat.class); @@ -497,4 +521,45 @@ public class TestMRJobs { localFs.setPermission(p, new FsPermission("700")); return p; } + + private String makeJobJarWithLib(String testDir) throws FileNotFoundException, + IOException{ + Path jobJarPath = new Path(testDir, "thejob.jar"); + FileOutputStream fos = + new FileOutputStream(new File(jobJarPath.toUri().getPath())); + JarOutputStream jos = new JarOutputStream(fos); + // Have to put in real jar files or it will complain + createAndAddJarToJar(jos, new File( + new Path(testDir, "lib1.jar").toUri().getPath())); + createAndAddJarToJar(jos, new File( + new Path(testDir, "lib2.jar").toUri().getPath())); + jos.close(); + localFs.setPermission(jobJarPath, new FsPermission("700")); + return jobJarPath.toUri().toString(); + } + + private void createAndAddJarToJar(JarOutputStream jos, File jarFile) + throws FileNotFoundException, IOException { + FileOutputStream fos2 = new FileOutputStream(jarFile); + JarOutputStream jos2 = new JarOutputStream(fos2); + // Have to have at least one entry or it will complain + ZipEntry ze = new ZipEntry("lib1.inside"); + jos2.putNextEntry(ze); + jos2.closeEntry(); + jos2.close(); + ze = new ZipEntry("lib/" + jarFile.getName()); + jos.putNextEntry(ze); + FileInputStream in = new FileInputStream(jarFile); + byte buf[] = new byte[1024]; + int numRead; + do { + numRead = in.read(buf); + if (numRead >= 0) { + jos.write(buf, 0, numRead); + } + } while (numRead != -1); + in.close(); + jos.closeEntry(); + jarFile.delete(); + } }