diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 65fbf1d6d63..5f15f5e9db5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -204,6 +204,9 @@ Release 2.0.3-alpha - Unreleased MAPREDUCE-4920. Use security token protobuf definition from hadoop common. (Suresh Srinivas via vinodkv) + MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus + calls. (sandyr via tucu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 62415f935f6..8fde50c68c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -251,9 +251,8 @@ private void copyAndConfigureFiles(Job job, Path submitJobDir, } // set the timestamps of the archives and files - ClientDistributedCacheManager.determineTimestamps(conf); // set the public/private visibility of the archives and files - ClientDistributedCacheManager.determineCacheVisibilities(conf); + ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf); // get DelegationToken for each cached file ClientDistributedCacheManager.getDelegationTokens(conf, job .getCredentials()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java index 46d89eda130..23f3cfcadf5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/filecache/ClientDistributedCacheManager.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.URI; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -37,6 +39,25 @@ @InterfaceAudience.Private public class ClientDistributedCacheManager { + /** + * Determines timestamps of files to be cached, and stores those + * in the configuration. Determines the visibilities of the distributed cache + * files and archives. The visibility of a cache path is "public" if the leaf + * component has READ permissions for others, and the parent subdirs have + * EXECUTE permissions for others. + * + * This is an internal method! + * + * @param job + * @throws IOException + */ + public static void determineTimestampsAndCacheVisibilities(Configuration job) + throws IOException { + Map statCache = new HashMap(); + determineTimestamps(job, statCache); + determineCacheVisibilities(job, statCache); + } + /** * Determines timestamps of files to be cached, and stores those * in the configuration. This is intended to be used internally by JobClient @@ -47,16 +68,17 @@ public class ClientDistributedCacheManager { * @param job Configuration of a job. * @throws IOException */ - public static void determineTimestamps(Configuration job) throws IOException { + public static void determineTimestamps(Configuration job, + Map statCache) throws IOException { URI[] tarchives = DistributedCache.getCacheArchives(job); if (tarchives != null) { - FileStatus status = getFileStatus(job, tarchives[0]); + FileStatus status = getFileStatus(job, tarchives[0], statCache); StringBuilder archiveFileSizes = new StringBuilder(String.valueOf(status.getLen())); StringBuilder archiveTimestamps = new StringBuilder(String.valueOf(status.getModificationTime())); for (int i = 1; i < tarchives.length; i++) { - status = getFileStatus(job, tarchives[i]); + status = getFileStatus(job, tarchives[i], statCache); archiveFileSizes.append(","); archiveFileSizes.append(String.valueOf(status.getLen())); archiveTimestamps.append(","); @@ -68,13 +90,13 @@ public static void determineTimestamps(Configuration job) throws IOException { URI[] tfiles = DistributedCache.getCacheFiles(job); if (tfiles != null) { - FileStatus status = getFileStatus(job, tfiles[0]); + FileStatus status = getFileStatus(job, tfiles[0], statCache); StringBuilder fileSizes = new StringBuilder(String.valueOf(status.getLen())); StringBuilder fileTimestamps = new StringBuilder(String.valueOf( status.getModificationTime())); for (int i = 1; i < tfiles.length; i++) { - status = getFileStatus(job, tfiles[i]); + status = getFileStatus(job, tfiles[i], statCache); fileSizes.append(","); fileSizes.append(String.valueOf(status.getLen())); fileTimestamps.append(","); @@ -123,25 +145,25 @@ public static void getDelegationTokens(Configuration job, * @param job * @throws IOException */ - public static void determineCacheVisibilities(Configuration job) - throws IOException { + public static void determineCacheVisibilities(Configuration job, + Map statCache) throws IOException { URI[] tarchives = DistributedCache.getCacheArchives(job); if (tarchives != null) { StringBuilder archiveVisibilities = - new StringBuilder(String.valueOf(isPublic(job, tarchives[0]))); + new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache))); for (int i = 1; i < tarchives.length; i++) { archiveVisibilities.append(","); - archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i]))); + archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], statCache))); } setArchiveVisibilities(job, archiveVisibilities.toString()); } URI[] tfiles = DistributedCache.getCacheFiles(job); if (tfiles != null) { StringBuilder fileVisibilities = - new StringBuilder(String.valueOf(isPublic(job, tfiles[0]))); + new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache))); for (int i = 1; i < tfiles.length; i++) { fileVisibilities.append(","); - fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i]))); + fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], statCache))); } setFileVisibilities(job, fileVisibilities.toString()); } @@ -193,19 +215,13 @@ static void setFileTimestamps(Configuration conf, String timestamps) { } /** - * Returns {@link FileStatus} of a given cache file on hdfs. - * - * @param conf configuration - * @param cache cache file - * @return {@link FileStatus} of a given cache file on hdfs - * @throws IOException + * Gets the file status for the given URI. If the URI is in the cache, + * returns it. Otherwise, fetches it and adds it to the cache. */ - static FileStatus getFileStatus(Configuration conf, URI cache) - throws IOException { - FileSystem fileSystem = FileSystem.get(cache, conf); - Path filePath = new Path(cache.getPath()); - - return fileSystem.getFileStatus(filePath); + private static FileStatus getFileStatus(Configuration job, URI uri, + Map statCache) throws IOException { + FileSystem fileSystem = FileSystem.get(uri, job); + return getFileStatus(fileSystem, uri, statCache); } /** @@ -216,14 +232,15 @@ static FileStatus getFileStatus(Configuration conf, URI cache) * @return true if the path in the uri is visible to all, false otherwise * @throws IOException */ - static boolean isPublic(Configuration conf, URI uri) throws IOException { + static boolean isPublic(Configuration conf, URI uri, + Map statCache) throws IOException { FileSystem fs = FileSystem.get(uri, conf); Path current = new Path(uri.getPath()); //the leaf level file should be readable by others - if (!checkPermissionOfOther(fs, current, FsAction.READ)) { + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { return false; } - return ancestorsHaveExecutePermissions(fs, current.getParent()); + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); } /** @@ -231,12 +248,12 @@ static boolean isPublic(Configuration conf, URI uri) throws IOException { * permission set for all users (i.e. that other users can traverse * the directory heirarchy to the given path) */ - static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path) - throws IOException { + static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path, + Map statCache) throws IOException { Path current = path; while (current != null) { //the subdirs in the path should have execute permissions for others - if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) { + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { return false; } current = current.getParent(); @@ -254,8 +271,8 @@ static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path) * @throws IOException */ private static boolean checkPermissionOfOther(FileSystem fs, Path path, - FsAction action) throws IOException { - FileStatus status = fs.getFileStatus(path); + FsAction action, Map statCache) throws IOException { + FileStatus status = getFileStatus(fs, path.toUri(), statCache); FsPermission perms = status.getPermission(); FsAction otherAction = perms.getOtherAction(); if (otherAction.implies(action)) { @@ -263,4 +280,14 @@ private static boolean checkPermissionOfOther(FileSystem fs, Path path, } return false; } + + private static FileStatus getFileStatus(FileSystem fs, URI uri, + Map statCache) throws IOException { + FileStatus stat = statCache.get(uri); + if (stat == null) { + stat = fs.getFileStatus(new Path(uri)); + statCache.put(uri, stat); + } + return stat; + } }