From 854d25b0c30fd40f640c052e79a8747741492042 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 30 Sep 2015 14:59:44 +0000 Subject: [PATCH] YARN-3727. For better error recovery, check if the directory exists before using it for localization. Contributed by Zhihai Xu --- hadoop-yarn-project/CHANGES.txt | 3 + .../localizer/LocalResourcesTracker.java | 3 +- .../localizer/LocalResourcesTrackerImpl.java | 24 +++++-- .../ResourceLocalizationService.java | 5 +- .../TestLocalResourcesTrackerImpl.java | 65 ++++++++++++++++--- .../TestResourceLocalizationService.java | 16 +++-- 6 files changed, 94 insertions(+), 22 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 43c501f0869..bf3ac12aae7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1005,6 +1005,9 @@ Release 2.7.2 - UNRELEASED YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot) + YARN-3727. For better error recovery, check if the directory exists before + using it for localization. (Zhihai Xu via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java index 14ec9117843..56e3de59b3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java @@ -33,7 +33,8 @@ interface LocalResourcesTracker boolean remove(LocalizedResource req, DeletionService delService); - Path getPathForLocalization(LocalResourceRequest req, Path localDirPath); + Path getPathForLocalization(LocalResourceRequest req, Path localDirPath, + DeletionService delService); String getUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index a1e68173421..51dbcaa3b31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -440,10 +440,12 @@ public Iterator iterator() { * @param {@link LocalResourceRequest} Resource localization request to * localize the resource. * @param {@link Path} local directory path + * @param {@link DeletionService} Deletion Service to delete existing + * path for localization. */ @Override - public Path - getPathForLocalization(LocalResourceRequest req, Path localDirPath) { + public Path getPathForLocalization(LocalResourceRequest req, + Path localDirPath, DeletionService delService) { Path rPath = localDirPath; if (useLocalCacheDirectoryManager && localDirPath != null) { @@ -463,8 +465,22 @@ public Iterator iterator() { inProgressLocalResourcesMap.put(req, rPath); } - rPath = new Path(rPath, - Long.toString(uniqueNumberGenerator.incrementAndGet())); + while (true) { + Path uniquePath = new Path(rPath, + Long.toString(uniqueNumberGenerator.incrementAndGet())); + File file = new File(uniquePath.toUri().getRawPath()); + if (!file.exists()) { + rPath = uniquePath; + break; + } + // If the directory already exists, delete it and move to next one. + LOG.warn("Directory " + uniquePath + " already exists, " + + "try next one."); + if (delService != null) { + delService.delete(getUser(), uniquePath); + } + } + Path localPath = new Path(rPath, req.getPath().getName()); LocalizedResource rsrc = localrsrc.get(req); rsrc.setLocalPath(localPath); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index e239e345cb0..2cc5683c470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -830,7 +830,8 @@ public void addResource(LocalizerResourceRequestEvent request) { + ContainerLocalizer.FILECACHE, ContainerLocalizer.getEstimatedSize(resource), true); Path publicDirDestPath = - publicRsrc.getPathForLocalization(key, publicRootPath); + publicRsrc.getPathForLocalization(key, publicRootPath, + delService); if (!publicDirDestPath.getParent().equals(publicRootPath)) { DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); } @@ -1116,7 +1117,7 @@ private Path getPathForLocalization(LocalResource rsrc) throws IOException, dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), - dirPath); + dirPath, delService); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 350cecb7497..e6aeae05051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -427,7 +428,7 @@ public void testHierarchicalLocalCacheDirectories() { // Simulate the process of localization of lr1 // NOTE: Localization path from tracker has resource ID at end Path hierarchicalPath1 = - tracker.getPathForLocalization(lr1, localDir).getParent(); + tracker.getPathForLocalization(lr1, localDir, null).getParent(); // Simulate lr1 getting localized ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(lr1, @@ -444,7 +445,7 @@ public void testHierarchicalLocalCacheDirectories() { tracker.handle(reqEvent2); Path hierarchicalPath2 = - tracker.getPathForLocalization(lr2, localDir).getParent(); + tracker.getPathForLocalization(lr2, localDir, null).getParent(); // localization failed. ResourceFailedLocalizationEvent rfe2 = new ResourceFailedLocalizationEvent( @@ -463,7 +464,7 @@ public void testHierarchicalLocalCacheDirectories() { LocalResourceVisibility.PUBLIC, lc1); tracker.handle(reqEvent3); Path hierarchicalPath3 = - tracker.getPathForLocalization(lr3, localDir).getParent(); + tracker.getPathForLocalization(lr3, localDir, null).getParent(); // localization successful ResourceLocalizedEvent rle3 = new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri() @@ -542,7 +543,8 @@ public void testStateStoreSuccessfulLocalization() throws Exception { dispatcher.await(); // Simulate the process of localization of lr1 - Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, + null); ArgumentCaptor localResourceCaptor = ArgumentCaptor.forClass(LocalResourceProto.class); @@ -622,7 +624,8 @@ public void testStateStoreFailedLocalization() throws Exception { dispatcher.await(); // Simulate the process of localization of lr1 - Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); + Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, + null); ArgumentCaptor localResourceCaptor = ArgumentCaptor.forClass(LocalResourceProto.class); @@ -691,7 +694,8 @@ public void testRecoveredResource() throws Exception { LocalResourceVisibility.APPLICATION, lc2); tracker.handle(reqEvent2); dispatcher.await(); - Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); + Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir, + null); long localizedId2 = Long.parseLong(hierarchicalPath2.getName()); Assert.assertEquals(localizedId1 + 1, localizedId2); } finally { @@ -785,6 +789,49 @@ public void testRecoveredResourceWithDirCacheMgr() throws Exception { } } + @Test + @SuppressWarnings("unchecked") + public void testGetPathForLocalization() throws Exception { + FileContext lfs = FileContext.getLocalFSFileContext(); + Path base_path = new Path("target", + TestLocalResourcesTrackerImpl.class.getSimpleName()); + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(1, 1); + Configuration conf = new YarnConfiguration(); + DrainDispatcher dispatcher = null; + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + NMStateStoreService stateStore = mock(NMStateStoreService.class); + DeletionService delService = mock(DeletionService.class); + try { + LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1, + LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, localrsrc, true, conf, stateStore, null); + Path conflictPath = new Path(base_path, "10"); + Path qualifiedConflictPath = lfs.makeQualified(conflictPath); + lfs.mkdir(qualifiedConflictPath, null, true); + Path rPath = tracker.getPathForLocalization(req1, base_path, + delService); + Assert.assertFalse(lfs.util().exists(rPath)); + verify(delService, times(1)).delete(eq(user), eq(conflictPath)); + } finally { + lfs.delete(base_path, true); + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + @SuppressWarnings("unchecked") @Test public void testResourcePresentInGoodDir() throws IOException { @@ -832,8 +879,10 @@ public void testResourcePresentInGoodDir() throws IOException { tracker.handle(req21Event); dispatcher.await(); // Localize resource1 - Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1")); - Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2")); + Path p1 = tracker.getPathForLocalization(req1, + new Path("/tmp/somedir1"), null); + Path p2 = tracker.getPathForLocalization(req2, + new Path("/tmp/somedir2"), null); ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1); tracker.handle(rle1); ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index c515506ed95..e7a9db80796 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -624,29 +624,31 @@ public void testRecovery() throws Exception { // Simulate start of localization for all resources privTracker1.getPathForLocalization(privReq1, dirsHandler.getLocalPathForWrite( - ContainerLocalizer.USERCACHE + user1)); + ContainerLocalizer.USERCACHE + user1), null); privTracker1.getPathForLocalization(privReq2, dirsHandler.getLocalPathForWrite( - ContainerLocalizer.USERCACHE + user1)); + ContainerLocalizer.USERCACHE + user1), null); LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1); LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2); appTracker1.getPathForLocalization(appReq1, dirsHandler.getLocalPathForWrite( - ContainerLocalizer.APPCACHE + appId1)); + ContainerLocalizer.APPCACHE + appId1), null); LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1); appTracker2.getPathForLocalization(appReq2, dirsHandler.getLocalPathForWrite( - ContainerLocalizer.APPCACHE + appId2)); + ContainerLocalizer.APPCACHE + appId2), null); LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2); appTracker2.getPathForLocalization(appReq3, dirsHandler.getLocalPathForWrite( - ContainerLocalizer.APPCACHE + appId2)); + ContainerLocalizer.APPCACHE + appId2), null); LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3); pubTracker.getPathForLocalization(pubReq1, - dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); + dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE), + null); LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1); pubTracker.getPathForLocalization(pubReq2, - dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); + dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE), + null); LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2); // Simulate completion of localization for most resources with