From 910742ad12de4ac15c3ac1f1a204e8301ffe3a25 Mon Sep 17 00:00:00 2001 From: Sangjin Lee Date: Tue, 21 Jun 2016 11:25:11 -0700 Subject: [PATCH] MAPREDUCE-6719. The list of -libjars archives should be replaced with a wildcard in the distributed cache to reduce the application footprint in the state store (Daniel Templeton via sjlee) (cherry picked from commit 605b4b61364781fc99ed27035c793153a20d8f71) --- .../hadoop/mapreduce/v2/util/MRApps.java | 70 ++++++-- .../java/org/apache/hadoop/mapreduce/Job.java | 7 +- .../hadoop/mapreduce/JobResourceUploader.java | 20 ++- .../hadoop/mapreduce/JobSubmissionFiles.java | 4 +- .../apache/hadoop/mapreduce/JobSubmitter.java | 6 +- .../ClientDistributedCacheManager.java | 31 +++- .../mapreduce/filecache/DistributedCache.java | 76 +++++++-- .../src/main/resources/mapred-default.xml | 18 +++ .../TestClientDistributedCacheManager.java | 151 +++++++++++++++--- .../filecache/TestDistributedCache.java | 132 +++++++++++++++ .../hadoop/mapred/TestLocalJobSubmission.java | 36 +++-- .../hadoop/mapreduce/v2/TestMRJobs.java | 48 ++++-- 12 files changed, 511 insertions(+), 88 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 8ca1a9d3f9a..a6493777a40 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -326,12 +326,36 @@ public class MRApps extends Apps { for (URI u: withLinks) { Path p = new Path(u); FileSystem remoteFS = p.getFileSystem(conf); + String name = p.getName(); + String wildcard = null; + + // If the path is wildcarded, resolve its parent directory instead + if (name.equals(DistributedCache.WILDCARD)) { + wildcard = name; + p = p.getParent(); + } + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); - String name = (null == u.getFragment()) - ? p.getName() : u.getFragment(); + + if ((wildcard != null) && (u.getFragment() != null)) { + throw new IOException("Invalid path URI: " + p + " - cannot " + + "contain both a URI fragment and a wildcard"); + } else if (wildcard != null) { + name = p.getName() + Path.SEPARATOR + wildcard; + } else if (u.getFragment() != null) { + name = u.getFragment(); + } + + // If it's not a JAR, add it to the link lookup. if (!StringUtils.toLowerCase(name).endsWith(".jar")) { - linkLookup.put(p, name); + String old = linkLookup.put(p, name); + + if ((old != null) && !name.equals(old)) { + LOG.warn("The same path is included more than once " + + "with different links or wildcards: " + p + " [" + + name + ", " + old + "]"); + } } } } @@ -598,16 +622,42 @@ public class MRApps extends Apps { URI u = uris[i]; Path p = new Path(u); FileSystem remoteFS = p.getFileSystem(conf); + String linkName = null; + + if (p.getName().equals(DistributedCache.WILDCARD)) { + p = p.getParent(); + linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD; + } + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); - // Add URI fragment or just the filename - Path name = new Path((null == u.getFragment()) - ? p.getName() - : u.getFragment()); - if (name.isAbsolute()) { - throw new IllegalArgumentException("Resource name must be relative"); + + // If there's no wildcard, try using the fragment for the link + if (linkName == null) { + linkName = u.getFragment(); + + // Because we don't know what's in the fragment, we have to handle + // it with care. + if (linkName != null) { + Path linkPath = new Path(linkName); + + if (linkPath.isAbsolute()) { + throw new IllegalArgumentException("Resource name must be " + + "relative"); + } + + linkName = linkPath.toUri().getPath(); + } + } else if (u.getFragment() != null) { + throw new IllegalArgumentException("Invalid path URI: " + p + + " - cannot contain both a URI fragment and a wildcard"); } - String linkName = name.toUri().getPath(); + + // If there's no wildcard or fragment, just link to the file name + if (linkName == null) { + linkName = p.getName(); + } + LocalResource orig = localResources.get(linkName); URL url = URL.fromURI(p.toUri()); if(orig != null && !orig.getResource().equals(url)) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 481107f39d8..33e820b3b13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -95,10 +95,13 @@ public class Job extends JobContextImpl implements JobContext { static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; public static final String USED_GENERIC_PARSER = - "mapreduce.client.genericoptionsparser.used"; + "mapreduce.client.genericoptionsparser.used"; public static final String SUBMIT_REPLICATION = - "mapreduce.client.submit.file.replication"; + "mapreduce.client.submit.file.replication"; public static final int DEFAULT_SUBMIT_REPLICATION = 10; + public static final String USE_WILDCARD_FOR_LIBJARS = + "mapreduce.client.libjars.wildcard"; + public static final boolean DEFAULT_USE_WILDCARD_FOR_LIBJARS = true; @InterfaceStability.Evolving public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index f3e4d2f018e..90e1fba7656 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -19,10 +19,8 @@ package org.apache.hadoop.mapreduce; import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -import java.net.UnknownHostException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,10 +38,12 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache; @InterfaceStability.Unstable class JobResourceUploader { protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class); - private FileSystem jtFs; + private final boolean useWildcard; + private final FileSystem jtFs; - JobResourceUploader(FileSystem submitFs) { + JobResourceUploader(FileSystem submitFs, boolean useWildcard) { this.jtFs = submitFs; + this.useWildcard = useWildcard; } /** @@ -126,8 +126,18 @@ class JobResourceUploader { for (String tmpjars : libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication); + + // Add each file to the classpath DistributedCache.addFileToClassPath( - new Path(newPath.toUri().getPath()), conf, jtFs); + new Path(newPath.toUri().getPath()), conf, jtFs, !useWildcard); + } + + if (useWildcard) { + // Add the whole directory to the cache + Path libJarsDirWildcard = + jtFs.makeQualified(new Path(libjarsDir, DistributedCache.WILDCARD)); + + DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java index 71250777473..c4adadf8cc7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java @@ -41,10 +41,10 @@ public class JobSubmissionFiles { // job submission directory is private! final public static FsPermission JOB_DIR_PERMISSION = - FsPermission.createImmutable((short) 0700); // rwx-------- + FsPermission.createImmutable((short) 0700); // rwx------ //job files are world-wide readable and owner writable final public static FsPermission JOB_FILE_PERMISSION = - FsPermission.createImmutable((short) 0644); // rw-r--r-- + FsPermission.createImmutable((short) 0644); // rw-r--r-- public static Path getJobSplitFile(Path jobSubmissionDir) { return new Path(jobSubmissionDir, "job.split"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 497b0ed3193..22874e10fdf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -94,7 +94,11 @@ class JobSubmitter { */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException { - JobResourceUploader rUploader = new JobResourceUploader(jtFs); + Configuration conf = job.getConfiguration(); + boolean useWildcards = conf.getBoolean(Job.USE_WILDCARD_FOR_LIBJARS, + Job.DEFAULT_USE_WILDCARD_FOR_LIBJARS); + JobResourceUploader rUploader = new JobResourceUploader(jtFs, useWildcards); + rUploader.uploadFiles(job, jobSubmitDir); // Get the working directory. If not set, sets it to filesystem working dir diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java index c15e647baf9..19470e82848 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java @@ -227,21 +227,27 @@ public class ClientDistributedCacheManager { /** * Returns a boolean to denote whether a cache file is visible to all(public) * or not - * @param conf - * @param uri + * @param conf the configuration + * @param uri the URI to test * @return true if the path in the uri is visible to all, false otherwise - * @throws IOException + * @throws IOException thrown if a file system operation fails */ static boolean isPublic(Configuration conf, URI uri, Map statCache) throws IOException { + boolean isPublic = true; FileSystem fs = FileSystem.get(uri, conf); Path current = new Path(uri.getPath()); current = fs.makeQualified(current); - //the leaf level file should be readable by others - if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { - return false; + + // If we're looking at a wildcarded path, we only need to check that the + // ancestors allow execution. Otherwise, look for read permissions in + // addition to the ancestors' permissions. + if (!current.getName().equals(DistributedCache.WILDCARD)) { + isPublic = checkPermissionOfOther(fs, current, FsAction.READ, statCache); } - return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); + + return isPublic && + ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); } /** @@ -284,11 +290,20 @@ public class ClientDistributedCacheManager { private static FileStatus getFileStatus(FileSystem fs, URI uri, Map statCache) throws IOException { + Path path = new Path(uri); + + if (path.getName().equals(DistributedCache.WILDCARD)) { + path = path.getParent(); + uri = path.toUri(); + } + FileStatus stat = statCache.get(uri); + if (stat == null) { - stat = fs.getFileStatus(new Path(uri)); + stat = fs.getFileStatus(path); statCache.put(uri, stat); } + return stat; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java index 51fe69a7212..d4d6c6e38f7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java @@ -126,12 +126,14 @@ import java.net.URI; * as well as methods intended for use by the MapReduce framework * (e.g., {@link org.apache.hadoop.mapred.JobClient}). * + * @see org.apache.hadoop.mapreduce.Job * @see org.apache.hadoop.mapred.JobConf * @see org.apache.hadoop.mapred.JobClient */ @Deprecated @InterfaceAudience.Private public class DistributedCache { + public static final String WILDCARD = "*"; /** * Set the configuration with the given set of archives. Intended @@ -139,6 +141,7 @@ public class DistributedCache { * @param archives The list of archives that need to be localized * @param conf Configuration which will be changed * @deprecated Use {@link Job#setCacheArchives(URI[])} instead + * @see Job#setCacheArchives(URI[]) */ @Deprecated public static void setCacheArchives(URI[] archives, Configuration conf) { @@ -152,6 +155,7 @@ public class DistributedCache { * @param files The list of files that need to be localized * @param conf Configuration which will be changed * @deprecated Use {@link Job#setCacheFiles(URI[])} instead + * @see Job#setCacheFiles(URI[]) */ @Deprecated public static void setCacheFiles(URI[] files, Configuration conf) { @@ -166,6 +170,7 @@ public class DistributedCache { * @return A URI array of the caches set in the Configuration * @throws IOException * @deprecated Use {@link JobContext#getCacheArchives()} instead + * @see JobContext#getCacheArchives() */ @Deprecated public static URI[] getCacheArchives(Configuration conf) throws IOException { @@ -179,6 +184,7 @@ public class DistributedCache { * @return A URI array of the files set in the Configuration * @throws IOException * @deprecated Use {@link JobContext#getCacheFiles()} instead + * @see JobContext#getCacheFiles() */ @Deprecated public static URI[] getCacheFiles(Configuration conf) throws IOException { @@ -192,6 +198,7 @@ public class DistributedCache { * @return A path array of localized caches * @throws IOException * @deprecated Use {@link JobContext#getLocalCacheArchives()} instead + * @see JobContext#getLocalCacheArchives() */ @Deprecated public static Path[] getLocalCacheArchives(Configuration conf) @@ -207,6 +214,7 @@ public class DistributedCache { * @return A path array of localized files * @throws IOException * @deprecated Use {@link JobContext#getLocalCacheFiles()} instead + * @see JobContext#getLocalCacheFiles() */ @Deprecated public static Path[] getLocalCacheFiles(Configuration conf) @@ -236,6 +244,7 @@ public class DistributedCache { * @param conf The configuration which stored the timestamps * @return a long array of timestamps * @deprecated Use {@link JobContext#getArchiveTimestamps()} instead + * @see JobContext#getArchiveTimestamps() */ @Deprecated public static long[] getArchiveTimestamps(Configuration conf) { @@ -250,6 +259,7 @@ public class DistributedCache { * @param conf The configuration which stored the timestamps * @return a long array of timestamps * @deprecated Use {@link JobContext#getFileTimestamps()} instead + * @see JobContext#getFileTimestamps() */ @Deprecated public static long[] getFileTimestamps(Configuration conf) { @@ -263,6 +273,7 @@ public class DistributedCache { * @param uri The uri of the cache to be localized * @param conf Configuration to add the cache to * @deprecated Use {@link Job#addCacheArchive(URI)} instead + * @see Job#addCacheArchive(URI) */ @Deprecated public static void addCacheArchive(URI uri, Configuration conf) { @@ -272,11 +283,27 @@ public class DistributedCache { } /** - * Add a file to be localized to the conf. Intended - * to be used by user code. + * Add a file to be localized to the conf. The localized file will be + * downloaded to the execution node(s), and a link will created to the + * file from the job's working directory. If the last part of URI's path name + * is "*", then the entire parent directory will be localized and links + * will be created from the job's working directory to each file in the + * parent directory. + * + * The access permissions of the file will determine whether the localized + * file will be shared across jobs. If the file is not readable by other or + * if any of its parent directories is not executable by other, then the + * file will not be shared. In the case of a path that ends in "/*", + * sharing of the localized files will be determined solely from the + * access permissions of the parent directories. The access permissions of + * the individual files will be ignored. + * + * Intended to be used by user code. + * * @param uri The uri of the cache to be localized * @param conf Configuration to add the cache to * @deprecated Use {@link Job#addCacheFile(URI)} instead + * @see Job#addCacheFile(URI) */ @Deprecated public static void addCacheFile(URI uri, Configuration conf) { @@ -286,12 +313,14 @@ public class DistributedCache { } /** - * Add an file path to the current set of classpath entries It adds the file - * to cache as well. Intended to be used by user code. + * Add a file path to the current set of classpath entries. The file will + * also be added to the cache. Intended to be used by user code. * * @param file Path of the file to be added * @param conf Configuration that contains the classpath setting * @deprecated Use {@link Job#addFileToClassPath(Path)} instead + * @see #addCacheFile(URI, Configuration) + * @see Job#addFileToClassPath(Path) */ @Deprecated public static void addFileToClassPath(Path file, Configuration conf) @@ -300,22 +329,42 @@ public class DistributedCache { } /** - * Add a file path to the current set of classpath entries. It adds the file - * to cache as well. Intended to be used by user code. + * Add a file path to the current set of classpath entries. The file will + * also be added to the cache. Intended to be used by user code. * * @param file Path of the file to be added * @param conf Configuration that contains the classpath setting * @param fs FileSystem with respect to which {@code archivefile} should * be interpreted. + * @see #addCacheFile(URI, Configuration) */ - public static void addFileToClassPath - (Path file, Configuration conf, FileSystem fs) - throws IOException { + public static void addFileToClassPath(Path file, Configuration conf, + FileSystem fs) { + addFileToClassPath(file, conf, fs, true); + } + + /** + * Add a file path to the current set of classpath entries. The file will + * also be added to the cache if {@code addToCache} is true. Used by + * internal DistributedCache code. + * + * @param file Path of the file to be added + * @param conf Configuration that contains the classpath setting + * @param fs FileSystem with respect to which {@code archivefile} should + * be interpreted. + * @param addToCache whether the file should also be added to the cache list + * @see #addCacheFile(URI, Configuration) + */ + public static void addFileToClassPath(Path file, Configuration conf, + FileSystem fs, boolean addToCache) { String classpath = conf.get(MRJobConfig.CLASSPATH_FILES); conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString() : classpath + "," + file.toString()); - URI uri = fs.makeQualified(file).toUri(); - addCacheFile(uri, conf); + + if (addToCache) { + URI uri = fs.makeQualified(file).toUri(); + addCacheFile(uri, conf); + } } /** @@ -323,7 +372,8 @@ public class DistributedCache { * Used by internal DistributedCache code. * * @param conf Configuration that contains the classpath setting - * @deprecated Use {@link JobContext#getFileClassPaths()} instead + * @deprecated Use {@link JobContext#getFileClassPaths()} instead + * @see JobContext#getFileClassPaths() */ @Deprecated public static Path[] getFileClassPaths(Configuration conf) { @@ -346,6 +396,7 @@ public class DistributedCache { * @param archive Path of the archive to be added * @param conf Configuration that contains the classpath setting * @deprecated Use {@link Job#addArchiveToClassPath(Path)} instead + * @see Job#addArchiveToClassPath(Path) */ @Deprecated public static void addArchiveToClassPath(Path archive, Configuration conf) @@ -378,6 +429,7 @@ public class DistributedCache { * * @param conf Configuration that contains the classpath setting * @deprecated Use {@link JobContext#getArchiveClassPaths()} instead + * @see JobContext#getArchiveClassPaths() */ @Deprecated public static Path[] getArchiveClassPaths(Configuration conf) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 5221e436959..47b82608067 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -838,6 +838,24 @@ + + mapreduce.client.libjars.wildcard + true + + Whether the libjars cache files should be localized using + a wildcarded directory instead of naming each archive independently. + Using wildcards reduces the space needed for storing the job + information in the case of a highly available resource manager + configuration. + This propery should only be set to false for specific + jobs which are highly sensitive to the details of the archive + localization. Having this property set to true will cause the archives + to all be localized to the same local cache location. If false, each + archive will be localized to its own local cache location. In both + cases a symbolic link will be created to every archive from the job's + working directory. + + mapreduce.task.profile diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java index b5f45e64a67..5212b9f0434 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestClientDistributedCacheManager.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.mapreduce.filecache; -import java.io.File; import java.io.IOException; import java.net.URI; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.After; import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; @@ -55,22 +58,22 @@ public class TestClientDistributedCacheManager { private static final Path TEST_VISIBILITY_CHILD_DIR = new Path(TEST_VISIBILITY_PARENT_DIR, "TestCacheVisibility_Child"); + private static final String FIRST_CACHE_FILE = "firstcachefile"; + private static final String SECOND_CACHE_FILE = "secondcachefile"; + private FileSystem fs; private Path firstCacheFile; private Path secondCacheFile; - private Path thirdCacheFile; private Configuration conf; @Before public void setup() throws IOException { conf = new Configuration(); fs = FileSystem.get(conf); - firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile"); - secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile"); - thirdCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR,"thirdCachefile"); + firstCacheFile = new Path(TEST_VISIBILITY_PARENT_DIR, FIRST_CACHE_FILE); + secondCacheFile = new Path(TEST_VISIBILITY_CHILD_DIR, SECOND_CACHE_FILE); createTempFile(firstCacheFile, conf); createTempFile(secondCacheFile, conf); - createTempFile(thirdCacheFile, conf); } @After @@ -88,37 +91,147 @@ public class TestClientDistributedCacheManager { job.addCacheFile(secondCacheFile.toUri()); Configuration jobConf = job.getConfiguration(); - Map statCache = new HashMap(); + Map statCache = new HashMap<>(); ClientDistributedCacheManager.determineTimestamps(jobConf, statCache); FileStatus firstStatus = statCache.get(firstCacheFile.toUri()); FileStatus secondStatus = statCache.get(secondCacheFile.toUri()); - Assert.assertNotNull(firstStatus); - Assert.assertNotNull(secondStatus); - Assert.assertEquals(2, statCache.size()); + Assert.assertNotNull(firstCacheFile + " was not found in the stats cache", + firstStatus); + Assert.assertNotNull(secondCacheFile + " was not found in the stats cache", + secondStatus); + Assert.assertEquals("Missing/extra entries found in the stas cache", + 2, statCache.size()); String expected = firstStatus.getModificationTime() + "," + secondStatus.getModificationTime(); Assert.assertEquals(expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS)); + + job = Job.getInstance(conf); + job.addCacheFile(new Path(TEST_VISIBILITY_CHILD_DIR, "*").toUri()); + jobConf = job.getConfiguration(); + statCache.clear(); + ClientDistributedCacheManager.determineTimestamps(jobConf, statCache); + + FileStatus thirdStatus = statCache.get(TEST_VISIBILITY_CHILD_DIR.toUri()); + + Assert.assertEquals("Missing/extra entries found in the stas cache", + 1, statCache.size()); + Assert.assertNotNull(TEST_VISIBILITY_CHILD_DIR + + " was not found in the stats cache", thirdStatus); + expected = Long.toString(thirdStatus.getModificationTime()); + Assert.assertEquals("Incorrect timestamp for " + TEST_VISIBILITY_CHILD_DIR, + expected, jobConf.get(MRJobConfig.CACHE_FILE_TIMESTAMPS)); } @Test public void testDetermineCacheVisibilities() throws IOException { - fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR); + fs.setPermission(TEST_VISIBILITY_PARENT_DIR, + new FsPermission((short)00777)); fs.setPermission(TEST_VISIBILITY_CHILD_DIR, new FsPermission((short)00777)); + fs.setWorkingDirectory(TEST_VISIBILITY_CHILD_DIR); + Job job = Job.getInstance(conf); + Path relativePath = new Path(SECOND_CACHE_FILE); + Path wildcardPath = new Path("*"); + Map statCache = new HashMap<>(); + Configuration jobConf; + + job.addCacheFile(firstCacheFile.toUri()); + job.addCacheFile(relativePath.toUri()); + jobConf = job.getConfiguration(); + + ClientDistributedCacheManager.determineCacheVisibilities(jobConf, + statCache); + // We use get() instead of getBoolean() so we can tell the difference + // between wrong and missing + assertEquals("The file paths were not found to be publicly visible " + + "even though the full path is publicly accessible", + "true,true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES)); + checkCacheEntries(statCache, null, firstCacheFile, relativePath); + + job = Job.getInstance(conf); + job.addCacheFile(wildcardPath.toUri()); + jobConf = job.getConfiguration(); + statCache.clear(); + + ClientDistributedCacheManager.determineCacheVisibilities(jobConf, + statCache); + // We use get() instead of getBoolean() so we can tell the difference + // between wrong and missing + assertEquals("The file path was not found to be publicly visible " + + "even though the full path is publicly accessible", + "true", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES)); + checkCacheEntries(statCache, null, wildcardPath.getParent()); + + Path qualifiedParent = fs.makeQualified(TEST_VISIBILITY_PARENT_DIR); fs.setPermission(TEST_VISIBILITY_PARENT_DIR, new FsPermission((short)00700)); - Job job = Job.getInstance(conf); - Path relativePath = new Path("thirdCachefile"); + job = Job.getInstance(conf); + job.addCacheFile(firstCacheFile.toUri()); job.addCacheFile(relativePath.toUri()); - Configuration jobConf = job.getConfiguration(); + jobConf = job.getConfiguration(); + statCache.clear(); - Map statCache = new HashMap(); - ClientDistributedCacheManager. - determineCacheVisibilities(jobConf, statCache); - Assert.assertFalse(jobConf. - getBoolean(MRJobConfig.CACHE_FILE_VISIBILITIES,true)); + ClientDistributedCacheManager.determineCacheVisibilities(jobConf, + statCache); + // We use get() instead of getBoolean() so we can tell the difference + // between wrong and missing + assertEquals("The file paths were found to be publicly visible " + + "even though the parent directory is not publicly accessible", + "false,false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES)); + checkCacheEntries(statCache, qualifiedParent, + firstCacheFile, relativePath); + + job = Job.getInstance(conf); + job.addCacheFile(wildcardPath.toUri()); + jobConf = job.getConfiguration(); + statCache.clear(); + + ClientDistributedCacheManager.determineCacheVisibilities(jobConf, + statCache); + // We use get() instead of getBoolean() so we can tell the difference + // between wrong and missing + assertEquals("The file path was found to be publicly visible " + + "even though the parent directory is not publicly accessible", + "false", jobConf.get(MRJobConfig.CACHE_FILE_VISIBILITIES)); + checkCacheEntries(statCache, qualifiedParent, wildcardPath.getParent()); + } + + /** + * Validate that the file status cache contains all and only entries for a + * given set of paths up to a common parent. + * + * @param statCache the cache + * @param top the common parent at which to stop digging + * @param paths the paths to compare against the cache + */ + private void checkCacheEntries(Map statCache, Path top, + Path... paths) { + Set expected = new HashSet<>(); + + for (Path path : paths) { + Path p = fs.makeQualified(path); + + while (!p.isRoot() && !p.equals(top)) { + expected.add(p.toUri()); + p = p.getParent(); + } + + expected.add(p.toUri()); + } + + Set uris = statCache.keySet(); + Set missing = new HashSet<>(uris); + Set extra = new HashSet<>(expected); + + missing.removeAll(expected); + extra.removeAll(uris); + + assertTrue("File status cache does not contain an entries for " + missing, + missing.isEmpty()); + assertTrue("File status cache contains extra extries: " + extra, + extra.isEmpty()); } @SuppressWarnings("deprecation") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java new file mode 100644 index 00000000000..14f4020dffc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.filecache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Test the {@link DistributedCache} class. + */ +public class TestDistributedCache { + /** + * Test of addFileOnlyToClassPath method, of class DistributedCache. + */ + @Test + public void testAddFileToClassPath() throws Exception { + Configuration conf = new Configuration(false); + + // Test first with 2 args + try { + DistributedCache.addFileToClassPath(null, conf); + fail("Accepted null archives argument"); + } catch (NullPointerException ex) { + // Expected + } + + DistributedCache.addFileToClassPath(new Path("file:///a"), conf); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES)); + + DistributedCache.addFileToClassPath(new Path("file:///b"), conf); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a,file:/b", + conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "file:///a,file:///b", + conf.get(MRJobConfig.CACHE_FILES)); + + // Now test with 3 args + FileSystem fs = FileSystem.newInstance(conf); + conf.clear(); + + try { + DistributedCache.addFileToClassPath(null, conf, fs); + fail("Accepted null archives argument"); + } catch (NullPointerException ex) { + // Expected + } + + DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES)); + + DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a,file:/b", + conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "file:///a,file:///b", + conf.get(MRJobConfig.CACHE_FILES)); + + // Now test with 4th arg true + conf.clear(); + + try { + DistributedCache.addFileToClassPath(null, conf, fs, true); + fail("Accepted null archives argument"); + } catch (NullPointerException ex) { + // Expected + } + + DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs, true); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "file:///a", conf.get(MRJobConfig.CACHE_FILES)); + + DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs, true); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a,file:/b", + conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "file:///a,file:///b", + conf.get(MRJobConfig.CACHE_FILES)); + + // And finally with 4th arg false + conf.clear(); + + try { + DistributedCache.addFileToClassPath(null, conf, fs, false); + fail("Accepted null archives argument"); + } catch (NullPointerException ex) { + // Expected + } + + DistributedCache.addFileToClassPath(new Path("file:///a"), conf, fs, false); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a", conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "", conf.get(MRJobConfig.CACHE_FILES, "")); + + DistributedCache.addFileToClassPath(new Path("file:///b"), conf, fs, false); + assertEquals("The mapreduce.job.classpath.files property was not " + + "set correctly", "file:/a,file:/b", + conf.get(MRJobConfig.CLASSPATH_FILES)); + assertEquals("The mapreduce.job.cache.files property was not set " + + "correctly", "", conf.get(MRJobConfig.CACHE_FILES, "")); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java index f7352f1a08d..4a2b8575325 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java @@ -18,23 +18,20 @@ package org.apache.hadoop.mapred; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.net.URL; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.SleepJob; import org.apache.hadoop.util.ToolRunner; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; @@ -47,24 +44,31 @@ public class TestLocalJobSubmission { private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")); - @Before - public void configure() throws Exception { - } - - @After - public void cleanup() { - } - /** - * test the local job submission options of - * -jt local -libjars. - * @throws IOException + * Test the local job submission options of -jt local -libjars. + * + * @throws IOException thrown if there's an error creating the JAR file */ @Test public void testLocalJobLibjarsOption() throws IOException { + Configuration conf = new Configuration(); + + testLocalJobLibjarsOption(conf); + + conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false); + testLocalJobLibjarsOption(conf); + } + + /** + * Test the local job submission options of -jt local -libjars. + * + * @param conf the {@link Configuration} to use + * @throws IOException thrown if there's an error creating the JAR file + */ + private void testLocalJobLibjarsOption(Configuration conf) + throws IOException { Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); - Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); conf.set(MRConfig.FRAMEWORK_NAME, "local"); final String[] args = { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 900bdeb20ee..451ec574464 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -911,7 +911,8 @@ public class TestMRJobs { } } - public void _testDistributedCache(String jobJarPath) throws Exception { + private void testDistributedCache(String jobJarPath, boolean withWildcard) + throws Exception { if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); @@ -920,7 +921,7 @@ public class TestMRJobs { // Create a temporary file of length 1. Path first = createTempFile("distributed.first", "x"); - // Create two jars with a single file inside them. + // Create three jars with a single file inside them. Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2); Path third = @@ -929,16 +930,28 @@ public class TestMRJobs { makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); Job job = Job.getInstance(mrCluster.getConfig()); - + // Set the job jar to a new "dummy" jar so we can check that its extracted // properly job.setJar(jobJarPath); - // Because the job jar is a "dummy" jar, we need to include the jar with - // DistributedCacheChecker or it won't be able to find it - Path distributedCacheCheckerJar = new Path( - JarFinder.getJar(DistributedCacheChecker.class)); - job.addFileToClassPath(distributedCacheCheckerJar.makeQualified( - localFs.getUri(), distributedCacheCheckerJar.getParent())); + + if (withWildcard) { + // If testing with wildcards, upload the DistributedCacheChecker into HDFS + // and add the directory as a wildcard. + Path libs = new Path("testLibs"); + Path wildcard = remoteFs.makeQualified(new Path(libs, "*")); + + remoteFs.mkdirs(libs); + remoteFs.copyFromLocalFile(third, libs); + job.addCacheFile(wildcard.toUri()); + } else { + // Otherwise add the DistributedCacheChecker directly to the classpath. + // Because the job jar is a "dummy" jar, we need to include the jar with + // DistributedCacheChecker or it won't be able to find it + Path distributedCacheCheckerJar = new Path( + JarFinder.getJar(DistributedCacheChecker.class)); + job.addFileToClassPath(localFs.makeQualified(distributedCacheCheckerJar)); + } job.setMapperClass(DistributedCacheChecker.class); job.setOutputFormatClass(NullOutputFormat.class); @@ -964,11 +977,10 @@ public class TestMRJobs { trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); } - @Test (timeout = 600000) - public void testDistributedCache() throws Exception { + private void testDistributedCache(boolean withWildcard) throws Exception { // Test with a local (file:///) Job Jar Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); - _testDistributedCache(localJobJarPath.toUri().toString()); + testDistributedCache(localJobJarPath.toUri().toString(), withWildcard); // Test with a remote (hdfs://) Job Jar Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/", @@ -978,7 +990,17 @@ public class TestMRJobs { if (localJobJarFile.exists()) { // just to make sure localJobJarFile.delete(); } - _testDistributedCache(remoteJobJarPath.toUri().toString()); + testDistributedCache(remoteJobJarPath.toUri().toString(), withWildcard); + } + + @Test (timeout = 300000) + public void testDistributedCache() throws Exception { + testDistributedCache(false); + } + + @Test (timeout = 300000) + public void testDistributedCacheWithWildcards() throws Exception { + testDistributedCache(true); } @Test(timeout = 120000)