From 3a54a5653bf1ea0b5b98e223c7500a9606abf04d Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 9 Apr 2013 19:56:10 +0000 Subject: [PATCH] YARN-112. Fixed a race condition during localization that fails containers. Contributed by Omkar Vinit Joshi. MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466196 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/LocalDistributedCacheManager.java | 11 +++- hadoop-yarn-project/CHANGES.txt | 3 + .../apache/hadoop/yarn/util/FSDownload.java | 17 +---- .../hadoop/yarn/util/TestFSDownload.java | 66 ++++++++++++++++--- .../localizer/ContainerLocalizer.java | 2 +- .../localizer/LocalResourcesTracker.java | 1 + .../localizer/LocalResourcesTrackerImpl.java | 12 ++++ .../ResourceLocalizationService.java | 10 +-- .../TestResourceLocalizationService.java | 7 +- 10 files changed, 98 insertions(+), 34 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index fe72721efc0..72967fd6a6a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -225,6 +225,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is submitting a job (Daryn Sharp via cos) + MAPREDUCE-5138. Fix LocalDistributedCacheManager after YARN-112. (Omkar Vinit + Joshi via vinodkv) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 3368d5134e9..1055516b65e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -32,13 +32,13 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,6 +91,9 @@ class LocalDistributedCacheManager { Map localResources = new LinkedHashMap(); MRApps.setupDistributedCache(conf, localResources); + // Generating unique numbers for FSDownload. + AtomicLong uniqueNumberGenerator = + new AtomicLong(System.currentTimeMillis()); // Find which resources are to be put on the local classpath Map classpaths = new HashMap(); @@ -128,8 +131,10 @@ class LocalDistributedCacheManager { Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); Map> resourcesToPaths = Maps.newHashMap(); for (LocalResource resource : localResources.values()) { - Callable download = new FSDownload(localFSFileContext, ugi, conf, - destPath, resource, new Random()); + Callable download = + new FSDownload(localFSFileContext, ugi, conf, new Path(destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())), + resource); Future future = exec.submit(download); resourcesToPaths.put(resource, future); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index db3ac6bdeb8..a48bc81a3c4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -208,6 +208,9 @@ Release 2.0.5-beta - UNRELEASED local directory hits unix file count limits and thus prevent job failures. (Omkar Vinit Joshi via vinodkv) + YARN-112. Fixed a race condition during localization that fails containers. + (Omkar Vinit Joshi via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java index 5253f49e87d..4a997ce5288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java @@ -23,7 +23,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import java.util.Random; import java.util.concurrent.Callable; import java.util.regex.Pattern; @@ -36,13 +35,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.RunJar; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.util.ConverterUtils; /** * Download a single URL to the local disk. @@ -51,8 +49,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; public class FSDownload implements Callable { private static final Log LOG = LogFactory.getLog(FSDownload.class); - - private Random rand; + private FileContext files; private final UserGroupInformation userUgi; private Configuration conf; @@ -71,13 +68,12 @@ public class FSDownload implements Callable { public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf, - Path destDirPath, LocalResource resource, Random rand) { + Path destDirPath, LocalResource resource) { this.conf = conf; this.destDirPath = destDirPath; this.files = files; this.userUgi = ugi; this.resource = resource; - this.rand = rand; } LocalResource getResource() { @@ -270,11 +266,6 @@ public class FSDownload implements Callable { } catch (URISyntaxException e) { throw new IOException("Invalid resource", e); } - Path tmp; - do { - tmp = new Path(destDirPath, String.valueOf(rand.nextLong())); - } while (files.util().exists(tmp)); - destDirPath = tmp; createDir(destDirPath, cachePerms); final Path dst_work = new Path(destDirPath + "_tmp"); createDir(dst_work, cachePerms); @@ -305,8 +296,6 @@ public class FSDownload implements Callable { files.delete(dst_work, true); } catch (FileNotFoundException ignore) { } - // clear ref to internal var - rand = null; conf = null; resource = null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 912c7248ca5..b02cc517b70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; @@ -66,6 +67,8 @@ import org.junit.Test; public class TestFSDownload { private static final Log LOG = LogFactory.getLog(TestFSDownload.class); + private static AtomicLong uniqueNumberGenerator = + new AtomicLong(System.currentTimeMillis()); @AfterClass public static void deleteTestDir() throws IOException { @@ -267,9 +270,11 @@ public class TestFSDownload { rsrcVis.put(rsrc, vis); Path destPath = dirs.getLocalPathForWrite( basedir.toString(), size, conf); + destPath = new Path (destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); FSDownload fsd = new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, - destPath, rsrc, new Random(sharedSeed)); + destPath, rsrc); pending.put(rsrc, exec.submit(fsd)); try { @@ -320,9 +325,11 @@ public class TestFSDownload { rsrcVis.put(rsrc, vis); Path destPath = dirs.getLocalPathForWrite( basedir.toString(), sizes[i], conf); + destPath = new Path (destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); FSDownload fsd = new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, - destPath, rsrc, new Random(sharedSeed)); + destPath, rsrc); pending.put(rsrc, exec.submit(fsd)); } @@ -380,9 +387,10 @@ public class TestFSDownload { Path p = new Path(basedir, "" + 1); LocalResource rsrc = createTarFile(files, p, size, rand, vis); Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + destPath = new Path (destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); FSDownload fsd = new FSDownload(files, - UserGroupInformation.getCurrentUser(), conf, destPath, rsrc, - new Random(sharedSeed)); + UserGroupInformation.getCurrentUser(), conf, destPath, rsrc); pending.put(rsrc, exec.submit(fsd)); try { @@ -437,9 +445,10 @@ public class TestFSDownload { LocalResource rsrcjar = createJarFile(files, p, size, rand, vis); rsrcjar.setType(LocalResourceType.PATTERN); Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + destPathjar = new Path (destPathjar, + Long.toString(uniqueNumberGenerator.incrementAndGet())); FSDownload fsdjar = new FSDownload(files, - UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar, - new Random(sharedSeed)); + UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar); pending.put(rsrcjar, exec.submit(fsdjar)); try { @@ -493,9 +502,10 @@ public class TestFSDownload { Path p = new Path(basedir, "" + 1); LocalResource rsrczip = createZipFile(files, p, size, rand, vis); Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + destPathjar = new Path (destPathjar, + Long.toString(uniqueNumberGenerator.incrementAndGet())); FSDownload fsdzip = new FSDownload(files, - UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip, - new Random(sharedSeed)); + UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip); pending.put(rsrczip, exec.submit(fsdzip)); try { @@ -586,9 +596,11 @@ public class TestFSDownload { rsrcVis.put(rsrc, vis); Path destPath = dirs.getLocalPathForWrite( basedir.toString(), conf); + destPath = new Path (destPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); FSDownload fsd = new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, - destPath, rsrc, new Random(sharedSeed)); + destPath, rsrc); pending.put(rsrc, exec.submit(fsd)); } @@ -614,4 +626,38 @@ public class TestFSDownload { } -} + + @Test(timeout = 1000) + public void testUniqueDestinationPath() throws Exception { + Configuration conf = new Configuration(); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor(); + + LocalDirAllocator dirs = + new LocalDirAllocator(TestFSDownload.class.getName()); + Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf); + destPath = + new Path(destPath, Long.toString(uniqueNumberGenerator + .incrementAndGet())); + try { + Path p = new Path(basedir, "dir" + 0 + ".jar"); + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + LocalResource rsrc = createJar(files, p, vis); + FSDownload fsd = + new FSDownload(files, UserGroupInformation.getCurrentUser(), conf, + destPath, rsrc); + Future rPath = singleThreadedExec.submit(fsd); + // Now FSDownload will not create a random directory to localize the + // resource. Therefore the final localizedPath for the resource should be + // destination directory (passed as an argument) + file name. + Assert.assertEquals(destPath, rPath.get().getParent()); + } finally { + singleThreadedExec.shutdown(); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 71bd4982195..0e5e398c45d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -198,7 +198,7 @@ public class ContainerLocalizer { Callable download(Path path, LocalResource rsrc, UserGroupInformation ugi) throws IOException { DiskChecker.checkDir(new File(path.toUri().getRawPath())); - return new FSDownload(lfs, ugi, conf, path, rsrc, new Random()); + return new FSDownload(lfs, ugi, conf, path, rsrc); } static long getEstimatedSize(LocalResource rsrc) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 5f451182d22..2e795e54a10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -43,4 +43,5 @@ interface LocalResourcesTracker // TODO: Remove this in favour of EventHandler.handle void localizationCompleted(LocalResourceRequest req, boolean success); + long nextUniqueNumber(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index c025d611d4f..53ca9013da8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -21,6 +21,7 @@ import java.io.File; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,6 +67,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { */ private ConcurrentHashMap inProgressLocalResourcesMap; + /* + * starting with 10 to accommodate 0-9 directories created as a part of + * LocalCacheDirectoryManager. So there will be one unique number generator + * per APPLICATION, USER and PUBLIC cache. + */ + private AtomicLong uniqueNumberGenerator = new AtomicLong(9); public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, Configuration conf) { @@ -283,4 +290,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { } } } + + @Override + public long nextUniqueNumber() { + return uniqueNumberGenerator.incrementAndGet(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index c03590cc7c5..5058cb2cad9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -34,7 +34,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -666,8 +665,11 @@ public class ResourceLocalizationService extends CompositeService DiskChecker.checkDir( new File(publicDirDestPath.toUri().getPath())); } + publicDirDestPath = + new Path(publicDirDestPath, Long.toString(publicRsrc + .nextUniqueNumber())); pending.put(queue.submit(new FSDownload( - lfs, null, conf, publicDirDestPath, resource, new Random())), + lfs, null, conf, publicDirDestPath, resource)), request); attempts.put(key, new LinkedList()); } catch (IOException e) { @@ -955,9 +957,9 @@ public class ResourceLocalizationService extends CompositeService Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); - return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), + dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc), dirPath); - + return new Path (dirPath, Long.toString(tracker.nextUniqueNumber())); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 51f2ca3bf02..77bde7b1795 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -520,7 +520,10 @@ public class TestResourceLocalizationService { new LocalResourceRequest(response.getResourceSpecs().get(0).getResource())); URL localizedPath = response.getResourceSpecs().get(0).getDestinationDirectory(); - assertTrue(localizedPath.getFile().endsWith(localPath)); + // Appending to local path unique number(10) generated as a part of + // LocalResourcesTracker + assertTrue(localizedPath.getFile().endsWith( + localPath + Path.SEPARATOR + "10")); // get second resource response = spyService.heartbeat(stat); @@ -534,7 +537,7 @@ public class TestResourceLocalizationService { // LocalCacheDirectoryManager will be used and we have restricted number // of files per directory to 1. assertTrue(localizedPath.getFile().endsWith( - localPath + Path.SEPARATOR + "0")); + localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11")); // empty rsrc response = spyService.heartbeat(stat);