MAPREDUCE-4068. Jars in lib subdirectory of the submittable JAR are not added to the classpath (rkanter via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1376253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d66223fd99
commit
dc33a0765c
|
@ -173,6 +173,9 @@ Branch-2 ( Unreleased changes )
|
||||||
for compatibility reasons is creating incorrect counter name.
|
for compatibility reasons is creating incorrect counter name.
|
||||||
(Jarek Jarcec Cecho via tomwhite)
|
(Jarek Jarcec Cecho via tomwhite)
|
||||||
|
|
||||||
|
MAPREDUCE-4068. Jars in lib subdirectory of the submittable JAR are not added to the
|
||||||
|
classpath (rkanter via tucu)
|
||||||
|
|
||||||
Release 2.1.0-alpha - Unreleased
|
Release 2.1.0-alpha - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -608,7 +608,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
localResources.put(
|
localResources.put(
|
||||||
MRJobConfig.JOB_JAR,
|
MRJobConfig.JOB_JAR,
|
||||||
createLocalResource(remoteFS, remoteJobJar,
|
createLocalResource(remoteFS, remoteJobJar,
|
||||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION));
|
||||||
LOG.info("The job-jar file on the remote FS is "
|
LOG.info("The job-jar file on the remote FS is "
|
||||||
+ remoteJobJar.toUri().toASCIIString());
|
+ remoteJobJar.toUri().toASCIIString());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -204,7 +204,15 @@ public class MRApps extends Apps {
|
||||||
Apps.addToEnvironment(
|
Apps.addToEnvironment(
|
||||||
environment,
|
environment,
|
||||||
Environment.CLASSPATH.name(),
|
Environment.CLASSPATH.name(),
|
||||||
MRJobConfig.JOB_JAR);
|
MRJobConfig.JOB_JAR + Path.SEPARATOR);
|
||||||
|
Apps.addToEnvironment(
|
||||||
|
environment,
|
||||||
|
Environment.CLASSPATH.name(),
|
||||||
|
MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR);
|
||||||
|
Apps.addToEnvironment(
|
||||||
|
environment,
|
||||||
|
Environment.CLASSPATH.name(),
|
||||||
|
MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*");
|
||||||
Apps.addToEnvironment(
|
Apps.addToEnvironment(
|
||||||
environment,
|
environment,
|
||||||
Environment.CLASSPATH.name(),
|
Environment.CLASSPATH.name(),
|
||||||
|
|
|
@ -158,7 +158,7 @@ public class TestMRApps {
|
||||||
}
|
}
|
||||||
String env_str = env.get("CLASSPATH");
|
String env_str = env.get("CLASSPATH");
|
||||||
assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
|
assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
|
||||||
env_str.indexOf("$PWD:job.jar"), 0);
|
env_str.indexOf("$PWD:job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void testSetClasspathWithNoUserPrecendence() {
|
@Test public void testSetClasspathWithNoUserPrecendence() {
|
||||||
|
@ -171,8 +171,12 @@ public class TestMRApps {
|
||||||
fail("Got exception while setting classpath");
|
fail("Got exception while setting classpath");
|
||||||
}
|
}
|
||||||
String env_str = env.get("CLASSPATH");
|
String env_str = env.get("CLASSPATH");
|
||||||
|
int index =
|
||||||
|
env_str.indexOf("job.jar/:job.jar/classes/:job.jar/lib/*:$PWD/*");
|
||||||
|
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not"
|
||||||
|
+ " in the classpath!", index, -1);
|
||||||
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
|
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
|
||||||
env_str.indexOf("$PWD:job.jar"), 0);
|
index, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -304,7 +304,7 @@ public class YARNRunner implements ClientProtocol {
|
||||||
return clientCache.getClient(jobId).getJobStatus(jobId);
|
return clientCache.getClient(jobId).getJobStatus(jobId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private LocalResource createApplicationResource(FileContext fs, Path p)
|
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
|
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
|
||||||
FileStatus rsrcStat = fs.getFileStatus(p);
|
FileStatus rsrcStat = fs.getFileStatus(p);
|
||||||
|
@ -312,7 +312,7 @@ public class YARNRunner implements ClientProtocol {
|
||||||
.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
|
.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
|
||||||
rsrc.setSize(rsrcStat.getLen());
|
rsrc.setSize(rsrcStat.getLen());
|
||||||
rsrc.setTimestamp(rsrcStat.getModificationTime());
|
rsrc.setTimestamp(rsrcStat.getModificationTime());
|
||||||
rsrc.setType(LocalResourceType.FILE);
|
rsrc.setType(type);
|
||||||
rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||||
return rsrc;
|
return rsrc;
|
||||||
}
|
}
|
||||||
|
@ -343,11 +343,12 @@ public class YARNRunner implements ClientProtocol {
|
||||||
|
|
||||||
localResources.put(MRJobConfig.JOB_CONF_FILE,
|
localResources.put(MRJobConfig.JOB_CONF_FILE,
|
||||||
createApplicationResource(defaultFileContext,
|
createApplicationResource(defaultFileContext,
|
||||||
jobConfPath));
|
jobConfPath, LocalResourceType.FILE));
|
||||||
if (jobConf.get(MRJobConfig.JAR) != null) {
|
if (jobConf.get(MRJobConfig.JAR) != null) {
|
||||||
localResources.put(MRJobConfig.JOB_JAR,
|
localResources.put(MRJobConfig.JOB_JAR,
|
||||||
createApplicationResource(defaultFileContext,
|
createApplicationResource(defaultFileContext,
|
||||||
new Path(jobSubmitDir, MRJobConfig.JOB_JAR)));
|
new Path(jobSubmitDir, MRJobConfig.JOB_JAR),
|
||||||
|
LocalResourceType.ARCHIVE));
|
||||||
} else {
|
} else {
|
||||||
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
|
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
|
||||||
// mapreduce jar itself which is already on the classpath.
|
// mapreduce jar itself which is already on the classpath.
|
||||||
|
@ -363,7 +364,7 @@ public class YARNRunner implements ClientProtocol {
|
||||||
localResources.put(
|
localResources.put(
|
||||||
MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
|
MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
|
||||||
createApplicationResource(defaultFileContext,
|
createApplicationResource(defaultFileContext,
|
||||||
new Path(jobSubmitDir, s)));
|
new Path(jobSubmitDir, s), LocalResourceType.FILE));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup security tokens
|
// Setup security tokens
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.mapreduce.v2;
|
package org.apache.hadoop.mapreduce.v2;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -26,6 +27,7 @@ import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.jar.JarOutputStream;
|
import java.util.jar.JarOutputStream;
|
||||||
import java.util.zip.ZipEntry;
|
import java.util.zip.ZipEntry;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -66,6 +68,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.util.JarFinder;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -402,13 +405,14 @@ public class TestMRJobs {
|
||||||
Path[] archives = context.getLocalCacheArchives();
|
Path[] archives = context.getLocalCacheArchives();
|
||||||
FileSystem fs = LocalFileSystem.get(conf);
|
FileSystem fs = LocalFileSystem.get(conf);
|
||||||
|
|
||||||
// Check that 3(2+ appjar) files and 2 archives are present
|
// Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files
|
||||||
Assert.assertEquals(3, files.length);
|
// and 2 archives are present
|
||||||
|
Assert.assertEquals(4, files.length);
|
||||||
Assert.assertEquals(2, archives.length);
|
Assert.assertEquals(2, archives.length);
|
||||||
|
|
||||||
// Check lengths of the files
|
// Check lengths of the files
|
||||||
Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen());
|
Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen());
|
||||||
Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
|
Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1);
|
||||||
|
|
||||||
// Check extraction of the archive
|
// Check extraction of the archive
|
||||||
Assert.assertTrue(fs.exists(new Path(archives[0],
|
Assert.assertTrue(fs.exists(new Path(archives[0],
|
||||||
|
@ -424,11 +428,23 @@ public class TestMRJobs {
|
||||||
Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
|
Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
|
||||||
Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
|
Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
|
||||||
Assert.assertNotNull(cl.getResource("distributed.jar.inside4"));
|
Assert.assertNotNull(cl.getResource("distributed.jar.inside4"));
|
||||||
|
// The Job Jar should have been extracted to a folder named "job.jar" and
|
||||||
|
// added to the classpath; the two jar files in the lib folder in the Job
|
||||||
|
// Jar should have also been added to the classpath
|
||||||
|
Assert.assertNotNull(cl.getResource("job.jar/"));
|
||||||
|
Assert.assertNotNull(cl.getResource("job.jar/lib/lib1.jar"));
|
||||||
|
Assert.assertNotNull(cl.getResource("job.jar/lib/lib2.jar"));
|
||||||
|
|
||||||
// Check that the symlink for the renaming was created in the cwd;
|
// Check that the symlink for the renaming was created in the cwd;
|
||||||
File symlinkFile = new File("distributed.first.symlink");
|
File symlinkFile = new File("distributed.first.symlink");
|
||||||
Assert.assertTrue(symlinkFile.exists());
|
Assert.assertTrue(symlinkFile.exists());
|
||||||
Assert.assertEquals(1, symlinkFile.length());
|
Assert.assertEquals(1, symlinkFile.length());
|
||||||
|
|
||||||
|
// Check that the symlink for the Job Jar was created in the cwd and
|
||||||
|
// points to the extracted directory
|
||||||
|
File jobJarDir = new File("job.jar");
|
||||||
|
Assert.assertTrue(FileUtils.isSymlink(jobJarDir));
|
||||||
|
Assert.assertTrue(jobJarDir.isDirectory());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,7 +467,15 @@ public class TestMRJobs {
|
||||||
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
|
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
|
||||||
|
|
||||||
Job job = Job.getInstance(mrCluster.getConfig());
|
Job job = Job.getInstance(mrCluster.getConfig());
|
||||||
job.setJarByClass(DistributedCacheChecker.class);
|
|
||||||
|
// Set the job jar to a new "dummy" jar so we can check that its extracted
|
||||||
|
// properly
|
||||||
|
job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()));
|
||||||
|
// Because the job jar is a "dummy" jar, we need to include the jar with
|
||||||
|
// DistributedCacheChecker or it won't be able to find it
|
||||||
|
job.addFileToClassPath(new Path(
|
||||||
|
JarFinder.getJar(DistributedCacheChecker.class)));
|
||||||
|
|
||||||
job.setMapperClass(DistributedCacheChecker.class);
|
job.setMapperClass(DistributedCacheChecker.class);
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
|
||||||
|
@ -497,4 +521,45 @@ public class TestMRJobs {
|
||||||
localFs.setPermission(p, new FsPermission("700"));
|
localFs.setPermission(p, new FsPermission("700"));
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String makeJobJarWithLib(String testDir) throws FileNotFoundException,
|
||||||
|
IOException{
|
||||||
|
Path jobJarPath = new Path(testDir, "thejob.jar");
|
||||||
|
FileOutputStream fos =
|
||||||
|
new FileOutputStream(new File(jobJarPath.toUri().getPath()));
|
||||||
|
JarOutputStream jos = new JarOutputStream(fos);
|
||||||
|
// Have to put in real jar files or it will complain
|
||||||
|
createAndAddJarToJar(jos, new File(
|
||||||
|
new Path(testDir, "lib1.jar").toUri().getPath()));
|
||||||
|
createAndAddJarToJar(jos, new File(
|
||||||
|
new Path(testDir, "lib2.jar").toUri().getPath()));
|
||||||
|
jos.close();
|
||||||
|
localFs.setPermission(jobJarPath, new FsPermission("700"));
|
||||||
|
return jobJarPath.toUri().toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createAndAddJarToJar(JarOutputStream jos, File jarFile)
|
||||||
|
throws FileNotFoundException, IOException {
|
||||||
|
FileOutputStream fos2 = new FileOutputStream(jarFile);
|
||||||
|
JarOutputStream jos2 = new JarOutputStream(fos2);
|
||||||
|
// Have to have at least one entry or it will complain
|
||||||
|
ZipEntry ze = new ZipEntry("lib1.inside");
|
||||||
|
jos2.putNextEntry(ze);
|
||||||
|
jos2.closeEntry();
|
||||||
|
jos2.close();
|
||||||
|
ze = new ZipEntry("lib/" + jarFile.getName());
|
||||||
|
jos.putNextEntry(ze);
|
||||||
|
FileInputStream in = new FileInputStream(jarFile);
|
||||||
|
byte buf[] = new byte[1024];
|
||||||
|
int numRead;
|
||||||
|
do {
|
||||||
|
numRead = in.read(buf);
|
||||||
|
if (numRead >= 0) {
|
||||||
|
jos.write(buf, 0, numRead);
|
||||||
|
}
|
||||||
|
} while (numRead != -1);
|
||||||
|
in.close();
|
||||||
|
jos.closeEntry();
|
||||||
|
jarFile.delete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue