diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index 085c96612d9..f1cad57dd41 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import org.apache.commons.logging.Log; @@ -91,7 +92,7 @@ class JobResourceUploader { submitJobDir = new Path(submitJobDir.toUri().getPath()); FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); - FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms); + mkdirs(jtFs, submitJobDir, mapredSysPerms); Collection files = conf.getStringCollection("tmpfiles"); Collection libjars = conf.getStringCollection("tmpjars"); @@ -116,18 +117,20 @@ class JobResourceUploader { job.getCredentials()); } - private void uploadFiles(Configuration conf, Collection files, + @VisibleForTesting + void uploadFiles(Configuration conf, Collection files, Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) throws IOException { Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); if (!files.isEmpty()) { - FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms); + mkdirs(jtFs, filesDir, mapredSysPerms); for (String tmpFile : files) { URI tmpURI = null; try { tmpURI = new URI(tmpFile); } catch (URISyntaxException e) { - throw new IllegalArgumentException(e); + throw new IllegalArgumentException("Error parsing files argument." + + " Argument must be a valid URI: " + tmpFile, e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication); @@ -136,50 +139,83 @@ class JobResourceUploader { DistributedCache.addCacheFile(pathURI, conf); } catch (URISyntaxException ue) { // should not throw a uri exception - throw new IOException("Failed to create uri for " + tmpFile, ue); + throw new IOException( + "Failed to create a URI (URISyntaxException) for the remote path " + + newPath + ". This was based on the files parameter: " + + tmpFile, + ue); } } } } - private void uploadLibJars(Configuration conf, Collection libjars, + // Suppress warning for use of DistributedCache (it is everywhere). + @SuppressWarnings("deprecation") + @VisibleForTesting + void uploadLibJars(Configuration conf, Collection libjars, Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) throws IOException { Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); if (!libjars.isEmpty()) { - FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms); + mkdirs(jtFs, libjarsDir, mapredSysPerms); + Collection libjarURIs = new LinkedList<>(); + boolean foundFragment = false; for (String tmpjars : libjars) { - Path tmp = new Path(tmpjars); + URI tmpURI = null; + try { + tmpURI = new URI(tmpjars); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Error parsing libjars argument." + + " Argument must be a valid URI: " + tmpjars, e); + } + Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); - - // Add each file to the classpath - DistributedCache.addFileToClassPath( - new Path(newPath.toUri().getPath()), conf, jtFs, !useWildcard); + try { + URI pathURI = getPathURI(newPath, tmpURI.getFragment()); + if (!foundFragment) { + foundFragment = pathURI.getFragment() != null; + } + DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf, + jtFs, false); + libjarURIs.add(pathURI); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the remote path " + + newPath + ". This was based on the libjar parameter: " + + tmpjars, + ue); + } } - if (useWildcard) { - // Add the whole directory to the cache + if (useWildcard && !foundFragment) { + // Add the whole directory to the cache using a wild card Path libJarsDirWildcard = jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD)); - DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf); + } else { + for (URI uri : libjarURIs) { + DistributedCache.addCacheFile(uri, conf); + } } } } - private void uploadArchives(Configuration conf, Collection archives, + @VisibleForTesting + void uploadArchives(Configuration conf, Collection archives, Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) throws IOException { Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); if (!archives.isEmpty()) { - FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms); + mkdirs(jtFs, archivesDir, mapredSysPerms); for (String tmpArchives : archives) { URI tmpURI; try { tmpURI = new URI(tmpArchives); } catch (URISyntaxException e) { - throw new IllegalArgumentException(e); + throw new IllegalArgumentException("Error parsing archives argument." + + " Argument must be a valid URI: " + tmpArchives, e); } Path tmp = new Path(tmpURI); Path newPath = @@ -189,13 +225,18 @@ class JobResourceUploader { DistributedCache.addCacheArchive(pathURI, conf); } catch (URISyntaxException ue) { // should not throw an uri excpetion - throw new IOException("Failed to create uri for " + tmpArchives, ue); + throw new IOException( + "Failed to create a URI (URISyntaxException) for the remote path" + + newPath + ". This was based on the archive parameter: " + + tmpArchives, + ue); } } } } - private void uploadJobJar(Job job, String jobJar, Path submitJobDir, + @VisibleForTesting + void uploadJobJar(Job job, String jobJar, Path submitJobDir, short submitReplication) throws IOException { if (jobJar != null) { // copy jar to JobTracker's fs // use jar name if job is not named. @@ -273,7 +314,8 @@ class JobResourceUploader { URI uri = new URI(s); return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); } catch (URISyntaxException e) { - throw new IllegalArgumentException(e); + throw new IllegalArgumentException( + "Error parsing argument." + " Argument must be a valid URI: " + s, e); } } @@ -380,9 +422,20 @@ class JobResourceUploader { return status; } + /** + * Create a new directory in the passed filesystem. This wrapper method exists + * so that it can be overridden/stubbed during testing. + */ + @VisibleForTesting + boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) + throws IOException { + return FileSystem.mkdirs(fs, dir, permission); + } + // copies a file to the jobtracker filesystem and returns the path where it // was copied to - private Path copyRemoteFiles(Path parentDir, Path originalPath, + @VisibleForTesting + Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf, short replication) throws IOException { // check if we do not need to copy the files // is jt using the same file system. @@ -400,10 +453,12 @@ class JobResourceUploader { Path newPath = new Path(parentDir, originalPath.getName()); FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf); jtFs.setReplication(newPath, replication); + jtFs.makeQualified(newPath); return newPath; } - private void copyJar(Path originalJarPath, Path submitJarFile, + @VisibleForTesting + void copyJar(Path originalJarPath, Path submitJarFile, short replication) throws IOException { jtFs.copyFromLocalFile(originalJarPath, submitJarFile); jtFs.setReplication(submitJarFile, replication); @@ -427,7 +482,7 @@ class JobResourceUploader { URI pathURI = destPath.toUri(); if (pathURI.getFragment() == null) { if (fragment == null) { - pathURI = new URI(pathURI.toString() + "#" + destPath.getName()); + // no fragment, just return existing pathURI from destPath } else { pathURI = new URI(pathURI.toString() + "#" + fragment); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java index 8ba50a66b3c..20b7b7dcfba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -23,13 +23,19 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobConf; import org.junit.Assert; import org.junit.Test; @@ -69,13 +75,13 @@ public class TestJobResourceUploader { @Test public void testAllDefaults() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); runLimitsTest(b.build(), true, null); } @Test public void testNoLimitsWithResources() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(1); b.setNumOfTmpArchives(10); @@ -88,7 +94,7 @@ public class TestJobResourceUploader { @Test public void testAtResourceLimit() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(1); b.setNumOfTmpArchives(1); @@ -101,7 +107,7 @@ public class TestJobResourceUploader { @Test public void testOverResourceLimit() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(1); b.setNumOfTmpArchives(1); @@ -114,7 +120,7 @@ public class TestJobResourceUploader { @Test public void testAtResourcesMBLimit() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(1); b.setNumOfTmpArchives(1); @@ -128,7 +134,7 @@ public class TestJobResourceUploader { @Test public void testOverResourcesMBLimit() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(2); b.setNumOfTmpArchives(1); @@ -142,7 +148,7 @@ public class TestJobResourceUploader { @Test public void testAtSingleResourceMBLimit() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(2); b.setNumOfTmpArchives(1); @@ -156,7 +162,7 @@ public class TestJobResourceUploader { @Test public void testOverSingleResourceMBLimit() throws IOException { - ResourceLimitsConf.Builder b = new ResourceLimitsConf.Builder(); + ResourceConf.Builder b = new ResourceConf.Builder(); b.setNumOfDCArchives(1); b.setNumOfDCFiles(2); b.setNumOfTmpArchives(1); @@ -168,20 +174,263 @@ public class TestJobResourceUploader { runLimitsTest(b.build(), false, ResourceViolation.SINGLE_RESOURCE_SIZE); } + private String destinationPathPrefix = "hdfs:///destinationPath/"; + private String[] expectedFilesNoFrags = + { destinationPathPrefix + "tmpFiles0.txt", + destinationPathPrefix + "tmpFiles1.txt", + destinationPathPrefix + "tmpFiles2.txt", + destinationPathPrefix + "tmpFiles3.txt", + destinationPathPrefix + "tmpFiles4.txt", + destinationPathPrefix + "tmpjars0.jar", + destinationPathPrefix + "tmpjars1.jar" }; + + private String[] expectedFilesWithFrags = + { destinationPathPrefix + "tmpFiles0.txt#tmpFilesfragment0.txt", + destinationPathPrefix + "tmpFiles1.txt#tmpFilesfragment1.txt", + destinationPathPrefix + "tmpFiles2.txt#tmpFilesfragment2.txt", + destinationPathPrefix + "tmpFiles3.txt#tmpFilesfragment3.txt", + destinationPathPrefix + "tmpFiles4.txt#tmpFilesfragment4.txt", + destinationPathPrefix + "tmpjars0.jar#tmpjarsfragment0.jar", + destinationPathPrefix + "tmpjars1.jar#tmpjarsfragment1.jar" }; + + // We use the local fs for the submitFS in the StubedUploader, so libjars + // should be replaced with a single path. + private String[] expectedFilesWithWildcard = + { destinationPathPrefix + "tmpFiles0.txt", + destinationPathPrefix + "tmpFiles1.txt", + destinationPathPrefix + "tmpFiles2.txt", + destinationPathPrefix + "tmpFiles3.txt", + destinationPathPrefix + "tmpFiles4.txt", + "file:///libjars-submit-dir/libjars/*" }; + + private String[] expectedArchivesNoFrags = + { destinationPathPrefix + "tmpArchives0.tgz", + destinationPathPrefix + "tmpArchives1.tgz" }; + + private String[] expectedArchivesWithFrags = + { destinationPathPrefix + "tmpArchives0.tgz#tmpArchivesfragment0.tgz", + destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" }; + + private String jobjarSubmitDir = "/jobjar-submit-dir"; + private String expectedJobJar = jobjarSubmitDir + "/job.jar"; + + @Test + public void testPathsWithNoFragNoSchemeRelative() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithScheme(false); + b.setPathsWithFrags(false); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, + expectedArchivesNoFrags, expectedJobJar); + } + + @Test + public void testPathsWithNoFragNoSchemeAbsolute() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(false); + b.setPathsWithScheme(false); + b.setAbsolutePaths(true); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, + expectedArchivesNoFrags, expectedJobJar); + } + + @Test + public void testPathsWithFragNoSchemeAbsolute() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(true); + b.setPathsWithScheme(false); + b.setAbsolutePaths(true); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, + expectedArchivesWithFrags, expectedJobJar); + } + + @Test + public void testPathsWithFragNoSchemeRelative() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(true); + b.setAbsolutePaths(false); + b.setPathsWithScheme(false); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, + expectedArchivesWithFrags, expectedJobJar); + } + + @Test + public void testPathsWithFragSchemeAbsolute() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(true); + b.setAbsolutePaths(true); + b.setPathsWithScheme(true); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, + expectedArchivesWithFrags, expectedJobJar); + } + + @Test + public void testPathsWithNoFragWithSchemeAbsolute() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(false); + b.setPathsWithScheme(true); + b.setAbsolutePaths(true); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, + expectedArchivesNoFrags, expectedJobJar); + } + + @Test + public void testPathsWithNoFragAndWildCard() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(4); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(false); + b.setPathsWithScheme(true); + b.setAbsolutePaths(true); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf, true); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard, + expectedArchivesNoFrags, expectedJobJar); + } + + @Test + public void testPathsWithFragsAndWildCard() throws IOException { + ResourceConf.Builder b = new ResourceConf.Builder(); + b.setNumOfTmpFiles(5); + b.setNumOfTmpLibJars(2); + b.setNumOfTmpArchives(2); + b.setJobJar(true); + b.setPathsWithFrags(true); + b.setPathsWithScheme(true); + b.setAbsolutePaths(true); + ResourceConf rConf = b.build(); + JobConf jConf = new JobConf(); + JobResourceUploader uploader = new StubedUploader(jConf, true); + + runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, + expectedArchivesWithFrags, expectedJobJar); + } + + private void runTmpResourcePathTest(JobResourceUploader uploader, + ResourceConf rConf, JobConf jConf, String[] expectedFiles, + String[] expectedArchives, String expectedJobJar) throws IOException { + rConf.setupJobConf(jConf); + // We use a pre and post job object here because we need the post job object + // to get the new values set during uploadResources, but we need the pre job + // to set the job jar because JobResourceUploader#uploadJobJar uses the Job + // interface not the JobConf. The post job is automatically created in + // validateResourcePaths. + Job jobPre = Job.getInstance(jConf); + uploadResources(uploader, jConf, jobPre); + + validateResourcePaths(jConf, expectedFiles, expectedArchives, + expectedJobJar, jobPre); + } + + private void uploadResources(JobResourceUploader uploader, JobConf jConf, + Job job) throws IOException { + Collection files = jConf.getStringCollection("tmpfiles"); + Collection libjars = jConf.getStringCollection("tmpjars"); + Collection archives = jConf.getStringCollection("tmparchives"); + String jobJar = jConf.getJar(); + uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null, + (short) 3); + uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"), + null, (short) 3); + uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"), + null, (short) 3); + uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3); + } + + private void validateResourcePaths(JobConf jConf, String[] expectedFiles, + String[] expectedArchives, String expectedJobJar, Job preJob) + throws IOException { + Job j = Job.getInstance(jConf); + validateResourcePathsSub(j.getCacheFiles(), expectedFiles); + validateResourcePathsSub(j.getCacheArchives(), expectedArchives); + // We use a different job object here because the jobjar was set on a + // different job object + Assert.assertEquals("Job jar path is different than expected!", + expectedJobJar, preJob.getJar()); + } + + private void validateResourcePathsSub(URI[] actualURIs, + String[] expectedURIs) { + List actualList = Arrays.asList(actualURIs); + Set expectedSet = new HashSet<>(Arrays.asList(expectedURIs)); + if (actualList.size() != expectedSet.size()) { + Assert.fail("Expected list of resources (" + expectedSet.size() + + ") and actual list of resources (" + actualList.size() + + ") are different lengths!"); + } + + for (URI u : actualList) { + if (!expectedSet.contains(u.toString())) { + Assert.fail("Resource list contained unexpected path: " + u.toString()); + } + } + } + private enum ResourceViolation { NUMBER_OF_RESOURCES, TOTAL_RESOURCE_SIZE, SINGLE_RESOURCE_SIZE; } - private void runLimitsTest(ResourceLimitsConf rlConf, - boolean checkShouldSucceed, ResourceViolation violation) - throws IOException { + private void runLimitsTest(ResourceConf rlConf, boolean checkShouldSucceed, + ResourceViolation violation) throws IOException { if (!checkShouldSucceed && violation == null) { Assert.fail("Test is misconfigured. checkShouldSucceed is set to false" + " and a ResourceViolation is not specified."); } - JobConf conf = setupJobConf(rlConf); + JobConf conf = new JobConf(); + rlConf.setupJobConf(conf); JobResourceUploader uploader = new StubedUploader(conf); long configuredSizeOfResourceBytes = rlConf.sizeOfResource * 1024 * 1024; when(mockedStatus.getLen()).thenReturn(configuredSizeOfResourceBytes); @@ -230,43 +479,7 @@ public class TestJobResourceUploader { private final FileStatus mockedStatus = mock(FileStatus.class); - private JobConf setupJobConf(ResourceLimitsConf rlConf) { - JobConf conf = new JobConf(); - conf.setInt(MRJobConfig.MAX_RESOURCES, rlConf.maxResources); - conf.setLong(MRJobConfig.MAX_RESOURCES_MB, rlConf.maxResourcesMB); - conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, - rlConf.maxSingleResourceMB); - - conf.set("tmpfiles", - buildPathString("file:///tmpFiles", rlConf.numOfTmpFiles)); - conf.set("tmpjars", - buildPathString("file:///tmpjars", rlConf.numOfTmpLibJars)); - conf.set("tmparchives", - buildPathString("file:///tmpArchives", rlConf.numOfTmpArchives)); - conf.set(MRJobConfig.CACHE_ARCHIVES, - buildPathString("file:///cacheArchives", rlConf.numOfDCArchives)); - conf.set(MRJobConfig.CACHE_FILES, - buildPathString("file:///cacheFiles", rlConf.numOfDCFiles)); - if (rlConf.jobJar) { - conf.setJar("file:///jobjar.jar"); - } - return conf; - } - - private String buildPathString(String pathPrefix, int numOfPaths) { - if (numOfPaths < 1) { - return ""; - } else { - StringBuilder b = new StringBuilder(); - b.append(pathPrefix + 0); - for (int i = 1; i < numOfPaths; i++) { - b.append("," + pathPrefix + i); - } - return b.toString(); - } - } - - final static class ResourceLimitsConf { + private static class ResourceConf { private final int maxResources; private final long maxResourcesMB; private final long maxSingleResourceMB; @@ -277,14 +490,15 @@ public class TestJobResourceUploader { private final int numOfDCFiles; private final int numOfDCArchives; private final long sizeOfResource; + private final boolean pathsWithFrags; + private final boolean pathsWithScheme; + private final boolean absolutePaths; - static final ResourceLimitsConf DEFAULT = new ResourceLimitsConf(); - - private ResourceLimitsConf() { + private ResourceConf() { this(new Builder()); } - private ResourceLimitsConf(Builder builder) { + private ResourceConf(Builder builder) { this.maxResources = builder.maxResources; this.maxResourcesMB = builder.maxResourcesMB; this.maxSingleResourceMB = builder.maxSingleResourceMB; @@ -295,6 +509,9 @@ public class TestJobResourceUploader { this.numOfDCFiles = builder.numOfDCFiles; this.numOfDCArchives = builder.numOfDCArchives; this.sizeOfResource = builder.sizeOfResource; + this.pathsWithFrags = builder.pathsWithFrags; + this.pathsWithScheme = builder.pathsWithScheme; + this.absolutePaths = builder.absolutePaths; } static class Builder { @@ -309,69 +526,176 @@ public class TestJobResourceUploader { private int numOfDCFiles = 0; private int numOfDCArchives = 0; private long sizeOfResource = 0; + private boolean pathsWithFrags = false; + private boolean pathsWithScheme = false; + private boolean absolutePaths = true; - Builder() { + private Builder() { } - Builder setMaxResources(int max) { + private Builder setMaxResources(int max) { this.maxResources = max; return this; } - Builder setMaxResourcesMB(long max) { + private Builder setMaxResourcesMB(long max) { this.maxResourcesMB = max; return this; } - Builder setMaxSingleResourceMB(long max) { + private Builder setMaxSingleResourceMB(long max) { this.maxSingleResourceMB = max; return this; } - Builder setNumOfTmpFiles(int num) { + private Builder setNumOfTmpFiles(int num) { this.numOfTmpFiles = num; return this; } - Builder setNumOfTmpArchives(int num) { + private Builder setNumOfTmpArchives(int num) { this.numOfTmpArchives = num; return this; } - Builder setNumOfTmpLibJars(int num) { + private Builder setNumOfTmpLibJars(int num) { this.numOfTmpLibJars = num; return this; } - Builder setJobJar(boolean jar) { + private Builder setJobJar(boolean jar) { this.jobJar = jar; return this; } - Builder setNumOfDCFiles(int num) { + private Builder setNumOfDCFiles(int num) { this.numOfDCFiles = num; return this; } - Builder setNumOfDCArchives(int num) { + private Builder setNumOfDCArchives(int num) { this.numOfDCArchives = num; return this; } - Builder setSizeOfResource(long sizeMB) { + private Builder setSizeOfResource(long sizeMB) { this.sizeOfResource = sizeMB; return this; } - ResourceLimitsConf build() { - return new ResourceLimitsConf(this); + private Builder setPathsWithFrags(boolean fragments) { + this.pathsWithFrags = fragments; + return this; + } + + private Builder setPathsWithScheme(boolean scheme) { + this.pathsWithScheme = scheme; + return this; + } + + private Builder setAbsolutePaths(boolean absolute) { + this.absolutePaths = absolute; + return this; + } + + ResourceConf build() { + return new ResourceConf(this); + } + } + + private void setupJobConf(JobConf conf) { + conf.set("tmpfiles", + buildPathString("tmpFiles", this.numOfTmpFiles, ".txt")); + conf.set("tmpjars", + buildPathString("tmpjars", this.numOfTmpLibJars, ".jar")); + conf.set("tmparchives", + buildPathString("tmpArchives", this.numOfTmpArchives, ".tgz")); + conf.set(MRJobConfig.CACHE_ARCHIVES, buildDistributedCachePathString( + "cacheArchives", this.numOfDCArchives, ".tgz")); + conf.set(MRJobConfig.CACHE_FILES, buildDistributedCachePathString( + "cacheFiles", this.numOfDCFiles, ".txt")); + if (this.jobJar) { + String fragment = ""; + if (pathsWithFrags) { + fragment = "#jobjarfrag.jar"; + } + if (pathsWithScheme) { + conf.setJar("file:///jobjar.jar" + fragment); + } else { + if (absolutePaths) { + conf.setJar("/jobjar.jar" + fragment); + } else { + conf.setJar("jobjar.jar" + fragment); + } + } + } + conf.setInt(MRJobConfig.MAX_RESOURCES, this.maxResources); + conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB); + conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, + this.maxSingleResourceMB); + } + + // We always want absolute paths with a scheme in the DistributedCache, so + // we use a separate method to construct the path string. + private String buildDistributedCachePathString(String pathPrefix, + int numOfPaths, String extension) { + if (numOfPaths < 1) { + return ""; + } else { + StringBuilder b = new StringBuilder(); + b.append(buildPathStringSub(pathPrefix, "file:///" + pathPrefix, + extension, 0)); + for (int i = 1; i < numOfPaths; i++) { + b.append("," + buildPathStringSub(pathPrefix, "file:///" + pathPrefix, + extension, i)); + } + return b.toString(); + } + } + + private String buildPathString(String pathPrefix, int numOfPaths, + String extension) { + if (numOfPaths < 1) { + return ""; + } else { + StringBuilder b = new StringBuilder(); + String processedPath; + if (pathsWithScheme) { + processedPath = "file:///" + pathPrefix; + } else { + if (absolutePaths) { + processedPath = "/" + pathPrefix; + } else { + processedPath = pathPrefix; + } + } + b.append(buildPathStringSub(pathPrefix, processedPath, extension, 0)); + for (int i = 1; i < numOfPaths; i++) { + b.append("," + + buildPathStringSub(pathPrefix, processedPath, extension, i)); + } + return b.toString(); + } + } + + private String buildPathStringSub(String pathPrefix, String processedPath, + String extension, int num) { + if (pathsWithFrags) { + return processedPath + num + extension + "#" + pathPrefix + "fragment" + + num + extension; + } else { + return processedPath + num + extension; } } } - class StubedUploader extends JobResourceUploader { + private class StubedUploader extends JobResourceUploader { StubedUploader(JobConf conf) throws IOException { - super(FileSystem.getLocal(conf), false); + this(conf, false); + } + + StubedUploader(JobConf conf, boolean useWildcard) throws IOException { + super(FileSystem.getLocal(conf), useWildcard); } @Override @@ -379,5 +703,26 @@ public class TestJobResourceUploader { Path p) throws IOException { return mockedStatus; } + + @Override + boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) + throws IOException { + // Do nothing. Stubbed out to avoid side effects. We don't actually need + // to create submit dirs. + return true; + } + + @Override + Path copyRemoteFiles(Path parentDir, Path originalPath, Configuration conf, + short replication) throws IOException { + return new Path(destinationPathPrefix + originalPath.getName()); + } + + @Override + void copyJar(Path originalJarPath, Path submitJarFile, short replication) + throws IOException { + // Do nothing. Stubbed out to avoid side effects. We don't actually need + // to copy the jar to the remote fs. + } } }