MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached (rkanter via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1377150 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-08-24 23:38:21 +00:00
parent a26cfc347f
commit 2ff2b79e33
5 changed files with 68 additions and 21 deletions

View File

@ -18,6 +18,9 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
MAPREDUCE-4408. allow jobs to set a JAR that is in the distributed cached
(rkanter via tucu)
BUG FIXES
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in

View File

@ -232,9 +232,17 @@ class JobSubmitter {
if ("".equals(job.getJobName())){
job.setJobName(new Path(jobJar).getName());
}
copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir),
Path jobJarPath = new Path(jobJar);
URI jobJarURI = jobJarPath.toUri();
// If the job jar is already in fs, we don't need to copy it from local fs
if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
|| !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme())
&& jobJarURI.getAuthority().equals(
jtFs.getUri().getAuthority()))) {
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication);
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
}
} else {
LOG.warn("No job jar file set. User classes may not be found. "+
"See Job or Job#setJar(String).");

View File

@ -345,10 +345,10 @@ public class YARNRunner implements ClientProtocol {
createApplicationResource(defaultFileContext,
jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) {
Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
localResources.put(MRJobConfig.JOB_JAR,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, MRJobConfig.JOB_JAR),
LocalResourceType.ARCHIVE));
jobJarPath, LocalResourceType.ARCHIVE));
} else {
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
// mapreduce jar itself which is already on the classpath.

View File

@ -72,8 +72,10 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
@Override
public void init(Configuration conf) {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {

View File

@ -41,10 +41,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@ -80,15 +80,24 @@ public class TestMRJobs {
private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
protected static MiniMRYarnCluster mrCluster;
protected static MiniDFSCluster dfsCluster;
private static Configuration conf = new Configuration();
private static FileSystem localFs;
private static FileSystem remoteFs;
static {
try {
localFs = FileSystem.getLocal(conf);
} catch (IOException io) {
throw new RuntimeException("problem getting local fs", io);
}
try {
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
}
private static Path TEST_ROOT_DIR = new Path("target",
@ -107,6 +116,8 @@ public class TestMRJobs {
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
mrCluster.init(conf);
mrCluster.start();
}
@ -123,6 +134,10 @@ public class TestMRJobs {
mrCluster.stop();
mrCluster = null;
}
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
}
@Test
@ -403,7 +418,6 @@ public class TestMRJobs {
Configuration conf = context.getConfiguration();
Path[] files = context.getLocalCacheFiles();
Path[] archives = context.getLocalCacheArchives();
FileSystem fs = LocalFileSystem.get(conf);
// Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files
// and 2 archives are present
@ -411,13 +425,13 @@ public class TestMRJobs {
Assert.assertEquals(2, archives.length);
// Check lengths of the files
Assert.assertEquals(1, fs.getFileStatus(files[1]).getLen());
Assert.assertTrue(fs.getFileStatus(files[2]).getLen() > 1);
Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen());
Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1);
// Check extraction of the archive
Assert.assertTrue(fs.exists(new Path(archives[0],
Assert.assertTrue(localFs.exists(new Path(archives[0],
"distributed.jar.inside3")));
Assert.assertTrue(fs.exists(new Path(archives[1],
Assert.assertTrue(localFs.exists(new Path(archives[1],
"distributed.jar.inside4")));
// Check the class loaders
@ -448,8 +462,7 @@ public class TestMRJobs {
}
}
@Test
public void testDistributedCache() throws Exception {
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
@ -470,11 +483,13 @@ public class TestMRJobs {
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()));
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
job.addFileToClassPath(new Path(
JarFinder.getJar(DistributedCacheChecker.class)));
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
@ -484,7 +499,9 @@ public class TestMRJobs {
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
@ -498,6 +515,23 @@ public class TestMRJobs {
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
@Test
public void testDistributedCache() throws Exception {
// Test with a local (file:///) Job Jar
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
_testDistributedCache(localJobJarPath.toUri().toString());
// Test with a remote (hdfs://) Job Jar
Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/",
localJobJarPath.getName());
remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath);
File localJobJarFile = new File(localJobJarPath.toUri().toString());
if (localJobJarFile.exists()) { // just to make sure
localJobJarFile.delete();
}
_testDistributedCache(remoteJobJarPath.toUri().toString());
}
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(TEST_ROOT_DIR, filename);
@ -522,7 +556,7 @@ public class TestMRJobs {
return p;
}
private String makeJobJarWithLib(String testDir) throws FileNotFoundException,
private Path makeJobJarWithLib(String testDir) throws FileNotFoundException,
IOException{
Path jobJarPath = new Path(testDir, "thejob.jar");
FileOutputStream fos =
@ -535,7 +569,7 @@ public class TestMRJobs {
new Path(testDir, "lib2.jar").toUri().getPath()));
jos.close();
localFs.setPermission(jobJarPath, new FsPermission("700"));
return jobJarPath.toUri().toString();
return jobJarPath;
}
private void createAndAddJarToJar(JarOutputStream jos, File jarFile)