MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus calls. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1431166 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-01-10 00:50:03 +00:00
parent 7e599d9e3b
commit 59d9d8bca9
3 changed files with 62 additions and 33 deletions

View File

@ -204,6 +204,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4920. Use security token protobuf definition from hadoop common. MAPREDUCE-4920. Use security token protobuf definition from hadoop common.
(Suresh Srinivas via vinodkv) (Suresh Srinivas via vinodkv)
MAPREDUCE-4907. TrackerDistributedCacheManager issues too many getFileStatus
calls. (sandyr via tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -251,9 +251,8 @@ class JobSubmitter {
} }
// set the timestamps of the archives and files // set the timestamps of the archives and files
ClientDistributedCacheManager.determineTimestamps(conf);
// set the public/private visibility of the archives and files // set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineCacheVisibilities(conf); ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
// get DelegationToken for each cached file // get DelegationToken for each cached file
ClientDistributedCacheManager.getDelegationTokens(conf, job ClientDistributedCacheManager.getDelegationTokens(conf, job
.getCredentials()); .getCredentials());

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.mapreduce.filecache;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -37,6 +39,25 @@ import org.apache.hadoop.security.Credentials;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientDistributedCacheManager { public class ClientDistributedCacheManager {
/**
* Determines timestamps of files to be cached, and stores those
* in the configuration. Determines the visibilities of the distributed cache
* files and archives. The visibility of a cache path is "public" if the leaf
* component has READ permissions for others, and the parent subdirs have
* EXECUTE permissions for others.
*
* This is an internal method!
*
* @param job
* @throws IOException
*/
public static void determineTimestampsAndCacheVisibilities(Configuration job)
throws IOException {
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
determineTimestamps(job, statCache);
determineCacheVisibilities(job, statCache);
}
/** /**
* Determines timestamps of files to be cached, and stores those * Determines timestamps of files to be cached, and stores those
* in the configuration. This is intended to be used internally by JobClient * in the configuration. This is intended to be used internally by JobClient
@ -47,16 +68,17 @@ public class ClientDistributedCacheManager {
* @param job Configuration of a job. * @param job Configuration of a job.
* @throws IOException * @throws IOException
*/ */
public static void determineTimestamps(Configuration job) throws IOException { public static void determineTimestamps(Configuration job,
Map<URI, FileStatus> statCache) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job); URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) { if (tarchives != null) {
FileStatus status = getFileStatus(job, tarchives[0]); FileStatus status = getFileStatus(job, tarchives[0], statCache);
StringBuilder archiveFileSizes = StringBuilder archiveFileSizes =
new StringBuilder(String.valueOf(status.getLen())); new StringBuilder(String.valueOf(status.getLen()));
StringBuilder archiveTimestamps = StringBuilder archiveTimestamps =
new StringBuilder(String.valueOf(status.getModificationTime())); new StringBuilder(String.valueOf(status.getModificationTime()));
for (int i = 1; i < tarchives.length; i++) { for (int i = 1; i < tarchives.length; i++) {
status = getFileStatus(job, tarchives[i]); status = getFileStatus(job, tarchives[i], statCache);
archiveFileSizes.append(","); archiveFileSizes.append(",");
archiveFileSizes.append(String.valueOf(status.getLen())); archiveFileSizes.append(String.valueOf(status.getLen()));
archiveTimestamps.append(","); archiveTimestamps.append(",");
@ -68,13 +90,13 @@ public class ClientDistributedCacheManager {
URI[] tfiles = DistributedCache.getCacheFiles(job); URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) { if (tfiles != null) {
FileStatus status = getFileStatus(job, tfiles[0]); FileStatus status = getFileStatus(job, tfiles[0], statCache);
StringBuilder fileSizes = StringBuilder fileSizes =
new StringBuilder(String.valueOf(status.getLen())); new StringBuilder(String.valueOf(status.getLen()));
StringBuilder fileTimestamps = new StringBuilder(String.valueOf( StringBuilder fileTimestamps = new StringBuilder(String.valueOf(
status.getModificationTime())); status.getModificationTime()));
for (int i = 1; i < tfiles.length; i++) { for (int i = 1; i < tfiles.length; i++) {
status = getFileStatus(job, tfiles[i]); status = getFileStatus(job, tfiles[i], statCache);
fileSizes.append(","); fileSizes.append(",");
fileSizes.append(String.valueOf(status.getLen())); fileSizes.append(String.valueOf(status.getLen()));
fileTimestamps.append(","); fileTimestamps.append(",");
@ -123,25 +145,25 @@ public class ClientDistributedCacheManager {
* @param job * @param job
* @throws IOException * @throws IOException
*/ */
public static void determineCacheVisibilities(Configuration job) public static void determineCacheVisibilities(Configuration job,
throws IOException { Map<URI, FileStatus> statCache) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job); URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) { if (tarchives != null) {
StringBuilder archiveVisibilities = StringBuilder archiveVisibilities =
new StringBuilder(String.valueOf(isPublic(job, tarchives[0]))); new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache)));
for (int i = 1; i < tarchives.length; i++) { for (int i = 1; i < tarchives.length; i++) {
archiveVisibilities.append(","); archiveVisibilities.append(",");
archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i]))); archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], statCache)));
} }
setArchiveVisibilities(job, archiveVisibilities.toString()); setArchiveVisibilities(job, archiveVisibilities.toString());
} }
URI[] tfiles = DistributedCache.getCacheFiles(job); URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) { if (tfiles != null) {
StringBuilder fileVisibilities = StringBuilder fileVisibilities =
new StringBuilder(String.valueOf(isPublic(job, tfiles[0]))); new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
for (int i = 1; i < tfiles.length; i++) { for (int i = 1; i < tfiles.length; i++) {
fileVisibilities.append(","); fileVisibilities.append(",");
fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i]))); fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], statCache)));
} }
setFileVisibilities(job, fileVisibilities.toString()); setFileVisibilities(job, fileVisibilities.toString());
} }
@ -193,19 +215,13 @@ public class ClientDistributedCacheManager {
} }
/** /**
* Returns {@link FileStatus} of a given cache file on hdfs. * Gets the file status for the given URI. If the URI is in the cache,
* * returns it. Otherwise, fetches it and adds it to the cache.
* @param conf configuration
* @param cache cache file
* @return {@link FileStatus} of a given cache file on hdfs
* @throws IOException
*/ */
static FileStatus getFileStatus(Configuration conf, URI cache) private static FileStatus getFileStatus(Configuration job, URI uri,
throws IOException { Map<URI, FileStatus> statCache) throws IOException {
FileSystem fileSystem = FileSystem.get(cache, conf); FileSystem fileSystem = FileSystem.get(uri, job);
Path filePath = new Path(cache.getPath()); return getFileStatus(fileSystem, uri, statCache);
return fileSystem.getFileStatus(filePath);
} }
/** /**
@ -216,14 +232,15 @@ public class ClientDistributedCacheManager {
* @return true if the path in the uri is visible to all, false otherwise * @return true if the path in the uri is visible to all, false otherwise
* @throws IOException * @throws IOException
*/ */
static boolean isPublic(Configuration conf, URI uri) throws IOException { static boolean isPublic(Configuration conf, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
FileSystem fs = FileSystem.get(uri, conf); FileSystem fs = FileSystem.get(uri, conf);
Path current = new Path(uri.getPath()); Path current = new Path(uri.getPath());
//the leaf level file should be readable by others //the leaf level file should be readable by others
if (!checkPermissionOfOther(fs, current, FsAction.READ)) { if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false; return false;
} }
return ancestorsHaveExecutePermissions(fs, current.getParent()); return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
} }
/** /**
@ -231,12 +248,12 @@ public class ClientDistributedCacheManager {
* permission set for all users (i.e. that other users can traverse * permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path) * the directory heirarchy to the given path)
*/ */
static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path) static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path,
throws IOException { Map<URI, FileStatus> statCache) throws IOException {
Path current = path; Path current = path;
while (current != null) { while (current != null) {
//the subdirs in the path should have execute permissions for others //the subdirs in the path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) { if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false; return false;
} }
current = current.getParent(); current = current.getParent();
@ -254,8 +271,8 @@ public class ClientDistributedCacheManager {
* @throws IOException * @throws IOException
*/ */
private static boolean checkPermissionOfOther(FileSystem fs, Path path, private static boolean checkPermissionOfOther(FileSystem fs, Path path,
FsAction action) throws IOException { FsAction action, Map<URI, FileStatus> statCache) throws IOException {
FileStatus status = fs.getFileStatus(path); FileStatus status = getFileStatus(fs, path.toUri(), statCache);
FsPermission perms = status.getPermission(); FsPermission perms = status.getPermission();
FsAction otherAction = perms.getOtherAction(); FsAction otherAction = perms.getOtherAction();
if (otherAction.implies(action)) { if (otherAction.implies(action)) {
@ -263,4 +280,14 @@ public class ClientDistributedCacheManager {
} }
return false; return false;
} }
private static FileStatus getFileStatus(FileSystem fs, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
FileStatus stat = statCache.get(uri);
if (stat == null) {
stat = fs.getFileStatus(new Path(uri));
statCache.put(uri, stat);
}
return stat;
}
} }