From 9695bc7af6e07dc808321ce1335b4ba220010f55 Mon Sep 17 00:00:00 2001 From: Christopher Douglas Date: Fri, 14 Mar 2014 00:31:08 +0000 Subject: [PATCH] YARN-1771. Reduce the number of NameNode operations during localization of public resources using a cache. Contributed by Sangjin Lee git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1577392 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/hadoop/fs/FileUtil.java | 10 +- .../TestLocalDistributedCacheManager.java | 114 +++++++++-------- hadoop-yarn-project/CHANGES.txt | 3 + .../apache/hadoop/yarn/util/FSDownload.java | 118 ++++++++++++++---- .../hadoop/yarn/util/TestFSDownload.java | 104 +++++++++++++-- .../localizer/LocalizerContext.java | 17 +++ .../ResourceLocalizationService.java | 12 +- 7 files changed, 282 insertions(+), 96 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 7bb20dd4032..8aa84230dd4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -339,11 +339,11 @@ public class FileUtil { } /** Copy files between FileSystems. */ - private static boolean copy(FileSystem srcFS, FileStatus srcStatus, - FileSystem dstFS, Path dst, - boolean deleteSource, - boolean overwrite, - Configuration conf) throws IOException { + public static boolean copy(FileSystem srcFS, FileStatus srcStatus, + FileSystem dstFS, Path dst, + boolean deleteSource, + boolean overwrite, + Configuration conf) throws IOException { Path src = srcStatus.getPath(); dst = checkDest(src.getName(), dstFS, dst, overwrite); if (srcStatus.isDirectory()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java index b131473d63d..ec80e65f030 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -18,19 +18,26 @@ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileWriter; import java.io.IOException; import java.net.URI; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -38,7 +45,6 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -89,8 +95,26 @@ public class TestLocalDistributedCacheManager { public void cleanup() throws Exception { delete(localDir); } - - @SuppressWarnings("rawtypes") + + /** + * Mock input stream based on a byte array so that it can be used by a + * FSDataInputStream. + */ + private static class MockInputStream extends ByteArrayInputStream + implements Seekable, PositionedReadable { + public MockInputStream(byte[] buf) { + super(buf); + } + + // empty implementation for unused methods + public int read(long position, byte[] buffer, int offset, int length) { return -1; } + public void readFully(long position, byte[] buffer, int offset, int length) {} + public void readFully(long position, byte[] buffer) {} + public void seek(long position) {} + public long getPos() { return 0; } + public boolean seekToNewSource(long targetPos) { return false; } + } + @Test public void testDownload() throws Exception { JobConf conf = new JobConf(); @@ -123,28 +147,22 @@ public class TestLocalDistributedCacheManager { } } }); - - doAnswer(new Answer() { + + when(mockfs.getConf()).thenReturn(conf); + final FSDataInputStream in = + new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); + when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock args) throws Throwable { - //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; - Path src = (Path)args.getArguments()[1]; - Path dst = (Path)args.getArguments()[2]; - if("file.txt".equals(src.getName())) { - File f = new File(dst.toUri().getPath()); - FileWriter writer = new FileWriter(f); - try { - writer.append("This is a test file\n"); - } finally { - if(writer != null) writer.close(); - } + public FSDataInputStream answer(InvocationOnMock args) throws Throwable { + Path src = (Path)args.getArguments()[0]; + if ("file.txt".equals(src.getName())) { + return in; } else { throw new FileNotFoundException(src+" not supported by mocking"); } - return null; } - }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); - + }); + DistributedCache.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); @@ -159,8 +177,7 @@ public class TestLocalDistributedCacheManager { } assertFalse(link.exists()); } - - @SuppressWarnings("rawtypes") + @Test public void testEmptyDownload() throws Exception { JobConf conf = new JobConf(); @@ -184,16 +201,16 @@ public class TestLocalDistributedCacheManager { throw new FileNotFoundException(p+" not supported by mocking"); } }); - - doAnswer(new Answer() { + + when(mockfs.getConf()).thenReturn(conf); + when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock args) throws Throwable { - //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; - Path src = (Path)args.getArguments()[1]; + public FSDataInputStream answer(InvocationOnMock args) throws Throwable { + Path src = (Path)args.getArguments()[0]; throw new FileNotFoundException(src+" not supported by mocking"); } - }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); - + }); + conf.set(MRJobConfig.CACHE_FILES, ""); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); @@ -203,9 +220,8 @@ public class TestLocalDistributedCacheManager { manager.close(); } } - - - @SuppressWarnings("rawtypes") + + @Test public void testDuplicateDownload() throws Exception { JobConf conf = new JobConf(); @@ -238,28 +254,22 @@ public class TestLocalDistributedCacheManager { } } }); - - doAnswer(new Answer() { + + when(mockfs.getConf()).thenReturn(conf); + final FSDataInputStream in = + new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); + when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer() { @Override - public Object answer(InvocationOnMock args) throws Throwable { - //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; - Path src = (Path)args.getArguments()[1]; - Path dst = (Path)args.getArguments()[2]; - if("file.txt".equals(src.getName())) { - File f = new File(dst.toUri().getPath()); - FileWriter writer = new FileWriter(f); - try { - writer.append("This is a test file\n"); - } finally { - if(writer != null) writer.close(); - } + public FSDataInputStream answer(InvocationOnMock args) throws Throwable { + Path src = (Path)args.getArguments()[0]; + if ("file.txt".equals(src.getName())) { + return in; } else { throw new FileNotFoundException(src+" not supported by mocking"); } - return null; } - }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); - + }); + DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101"); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4200f6cd355..5cb3a763666 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -273,6 +273,9 @@ Release 2.4.0 - UNRELEASED expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. (Mayank Bansal via zjshen) + YARN-1771. Reduce the number of NameNode operations during localization of + public resources using a cache. (Sangjin Lee via cdouglas) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 36dfc6bc8e3..de817033d26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -43,6 +45,11 @@ import org.apache.hadoop.util.RunJar; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.Futures; + /** * Download a single URL to the local disk. * @@ -56,6 +63,7 @@ public class FSDownload implements Callable { private final UserGroupInformation userUgi; private Configuration conf; private LocalResource resource; + private final LoadingCache> statCache; /** The local FS dir path under which this resource is to be localized to */ private Path destDirPath; @@ -71,11 +79,18 @@ public class FSDownload implements Callable { public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf, Path destDirPath, LocalResource resource) { + this(files, ugi, conf, destDirPath, resource, null); + } + + public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf, + Path destDirPath, LocalResource resource, + LoadingCache> statCache) { this.conf = conf; this.destDirPath = destDirPath; this.files = files; this.userUgi = ugi; this.resource = resource; + this.statCache = statCache; } LocalResource getResource() { @@ -90,28 +105,43 @@ public class FSDownload implements Callable { } /** - * Returns a boolean to denote whether a cache file is visible to all(public) - * or not - * @param conf - * @param uri - * @return true if the path in the uri is visible to all, false otherwise - * @throws IOException + * Creates the cache loader for the status loading cache. This should be used + * to create an instance of the status cache that is passed into the + * FSDownload constructor. */ - private static boolean isPublic(FileSystem fs, Path current) throws IOException { - current = fs.makeQualified(current); - //the leaf level file should be readable by others - if (!checkPublicPermsForAll(fs, current, FsAction.READ_EXECUTE, FsAction.READ)) { - return false; - } - return ancestorsHaveExecutePermissions(fs, current.getParent()); + public static CacheLoader> + createStatusCacheLoader(final Configuration conf) { + return new CacheLoader>() { + public Future load(Path path) { + try { + FileSystem fs = path.getFileSystem(conf); + return Futures.immediateFuture(fs.getFileStatus(path)); + } catch (Throwable th) { + // report failures so it can be memoized + return Futures.immediateFailedFuture(th); + } + } + }; } - private static boolean checkPublicPermsForAll(FileSystem fs, Path current, - FsAction dir, FsAction file) - throws IOException { - return checkPublicPermsForAll(fs, fs.getFileStatus(current), dir, file); + /** + * Returns a boolean to denote whether a cache file is visible to all (public) + * or not + * + * @return true if the path in the current path is visible to all, false + * otherwise + */ + @VisibleForTesting + static boolean isPublic(FileSystem fs, Path current, FileStatus sStat, + LoadingCache> statCache) throws IOException { + current = fs.makeQualified(current); + //the leaf level file should be readable by others + if (!checkPublicPermsForAll(fs, sStat, FsAction.READ_EXECUTE, FsAction.READ)) { + return false; + } + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache); } - + private static boolean checkPublicPermsForAll(FileSystem fs, FileStatus status, FsAction dir, FsAction file) throws IOException { @@ -137,12 +167,13 @@ public class FSDownload implements Callable { * permission set for all users (i.e. that other users can traverse * the directory heirarchy to the given path) */ - private static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path) - throws IOException { + private static boolean ancestorsHaveExecutePermissions(FileSystem fs, + Path path, LoadingCache> statCache) + throws IOException { Path current = path; while (current != null) { //the subdirs in the path should have execute permissions for others - if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) { + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { return false; } current = current.getParent(); @@ -160,14 +191,46 @@ public class FSDownload implements Callable { * @throws IOException */ private static boolean checkPermissionOfOther(FileSystem fs, Path path, - FsAction action) throws IOException { - FileStatus status = fs.getFileStatus(path); + FsAction action, LoadingCache> statCache) + throws IOException { + FileStatus status = getFileStatus(fs, path, statCache); FsPermission perms = status.getPermission(); FsAction otherAction = perms.getOtherAction(); return otherAction.implies(action); } - + /** + * Obtains the file status, first by checking the stat cache if it is + * available, and then by getting it explicitly from the filesystem. If we got + * the file status from the filesystem, it is added to the stat cache. + * + * The stat cache is expected to be managed by callers who provided it to + * FSDownload. + */ + private static FileStatus getFileStatus(final FileSystem fs, final Path path, + LoadingCache> statCache) throws IOException { + // if the stat cache does not exist, simply query the filesystem + if (statCache == null) { + return fs.getFileStatus(path); + } + + try { + // get or load it from the cache + return statCache.get(path).get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + // the underlying exception should normally be IOException + if (cause instanceof IOException) { + throw (IOException)cause; + } else { + throw new IOException(cause); + } + } catch (InterruptedException e) { // should not happen + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + private Path copy(Path sCopy, Path dstdir) throws IOException { FileSystem sourceFs = sCopy.getFileSystem(conf); Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName()); @@ -178,14 +241,15 @@ public class FSDownload implements Callable { ", was " + sStat.getModificationTime()); } if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { - if (!isPublic(sourceFs, sCopy)) { + if (!isPublic(sourceFs, sCopy, sStat, statCache)) { throw new IOException("Resource " + sCopy + " is not publicly accessable and as such cannot be part of the" + " public cache."); } } - - sourceFs.copyToLocalFile(sCopy, dCopy); + + FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, + true, conf); return dCopy; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 86909e7322d..cfdd3c8d145 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -21,27 +21,35 @@ package org.apache.hadoop.yarn.util; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; +import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; -import java.util.zip.GZIPOutputStream; import junit.framework.Assert; @@ -64,10 +72,13 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.AfterClass; import org.junit.Test; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + public class TestFSDownload { private static final Log LOG = LogFactory.getLog(TestFSDownload.class); @@ -88,6 +99,18 @@ public class TestFSDownload { static LocalResource createFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException { + createFile(files, p, len, r); + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(p)); + ret.setSize(len); + ret.setType(LocalResourceType.FILE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(p).getModificationTime()); + return ret; + } + + static void createFile(FileContext files, Path p, int len, Random r) + throws IOException { FSDataOutputStream out = null; try { byte[] bytes = new byte[len]; @@ -97,13 +120,6 @@ public class TestFSDownload { } finally { if (out != null) out.close(); } - LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); - ret.setResource(ConverterUtils.getYarnUrlFromPath(p)); - ret.setSize(len); - ret.setType(LocalResourceType.FILE); - ret.setVisibility(vis); - ret.setTimestamp(files.getFileStatus(p).getModificationTime()); - return ret; } static LocalResource createJar(FileContext files, Path p, @@ -285,6 +301,76 @@ public class TestFSDownload { } } + @Test (timeout=60000) + public void testDownloadPublicWithStatCache() throws IOException, + URISyntaxException, InterruptedException, ExecutionException { + final Configuration conf = new Configuration(); + FileContext files = FileContext.getLocalFSFileContext(conf); + Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + int size = 512; + + final ConcurrentMap counts = + new ConcurrentHashMap(); + final CacheLoader> loader = + FSDownload.createStatusCacheLoader(conf); + final LoadingCache> statCache = + CacheBuilder.newBuilder().build(new CacheLoader>() { + public Future load(Path path) throws Exception { + // increment the count + AtomicInteger count = counts.get(path); + if (count == null) { + count = new AtomicInteger(0); + AtomicInteger existing = counts.putIfAbsent(path, count); + if (existing != null) { + count = existing; + } + } + count.incrementAndGet(); + + // use the default loader + return loader.load(path); + } + }); + + // test FSDownload.isPublic() concurrently + final int fileCount = 3; + List> tasks = new ArrayList>(); + for (int i = 0; i < fileCount; i++) { + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + final Path path = new Path(basedir, "test-file-" + i); + createFile(files, path, size, rand); + final FileSystem fs = path.getFileSystem(conf); + final FileStatus sStat = fs.getFileStatus(path); + tasks.add(new Callable() { + public Boolean call() throws IOException { + return FSDownload.isPublic(fs, path, sStat, statCache); + } + }); + } + + ExecutorService exec = Executors.newFixedThreadPool(fileCount); + try { + List> futures = exec.invokeAll(tasks); + // files should be public + for (Future future: futures) { + assertTrue(future.get()); + } + // for each path exactly one file status call should be made + for (AtomicInteger count: counts.values()) { + assertSame(count.get(), 1); + } + } finally { + exec.shutdown(); + } + } + @Test (timeout=10000) public void testDownload() throws IOException, URISyntaxException, InterruptedException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java index 25a7d8d8ffa..6c4919fc3aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java @@ -18,20 +18,34 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; +import com.google.common.cache.LoadingCache; + public class LocalizerContext { private final String user; private final ContainerId containerId; private final Credentials credentials; + private final LoadingCache> statCache; public LocalizerContext(String user, ContainerId containerId, Credentials credentials) { + this(user, containerId, credentials, null); + } + + public LocalizerContext(String user, ContainerId containerId, + Credentials credentials, + LoadingCache> statCache) { this.user = user; this.containerId = containerId; this.credentials = credentials; + this.statCache = statCache; } public String getUser() { @@ -46,4 +60,7 @@ public class LocalizerContext { return credentials; } + public LoadingCache> getStatCache() { + return statCache; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 5e20b539aa5..de9bbdc9948 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -83,8 +83,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; @@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ResourceLocalizationService extends CompositeService @@ -362,8 +364,11 @@ public class ResourceLocalizationService extends CompositeService private void handleInitContainerResources( ContainerLocalizationRequestEvent rsrcReqs) { Container c = rsrcReqs.getContainer(); + // create a loading cache for the file statuses + LoadingCache> statCache = + CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig())); LocalizerContext ctxt = new LocalizerContext( - c.getUser(), c.getContainerId(), c.getCredentials()); + c.getUser(), c.getContainerId(), c.getCredentials(), statCache); Map> rsrcs = rsrcReqs.getRequestedResources(); for (Map.Entry> e : @@ -680,7 +685,8 @@ public class ResourceLocalizationService extends CompositeService // completing and being dequeued before pending updated synchronized (pending) { pending.put(queue.submit(new FSDownload(lfs, null, conf, - publicDirDestPath, resource)), request); + publicDirDestPath, resource, request.getContext().getStatCache())), + request); } } catch (IOException e) { rsrc.unlock();