YARN-3727. For better error recovery, check if the directory exists before using it for localization. Contributed by Zhihai Xu

(cherry picked from commit 854d25b0c3)

Conflicts:

	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
(cherry picked from commit 493f072008)

Conflicts:

	hadoop-yarn-project/CHANGES.txt
This commit is contained in:
Jason Lowe 2015-09-30 16:12:09 +00:00
parent c4cbbcbc26
commit 49335d9b2b
6 changed files with 90 additions and 20 deletions

View File

@ -27,6 +27,9 @@ Release 2.6.2 - UNRELEASED
YARN-3554. Default value for maximum nodemanager connect wait time is too YARN-3554. Default value for maximum nodemanager connect wait time is too
high (Naganarasimha G R via jlowe) high (Naganarasimha G R via jlowe)
YARN-3727. For better error recovery, check if the directory exists before
using it for localization. (Zhihai Xu via jlowe)
Release 2.6.1 - 2015-09-23 Release 2.6.1 - 2015-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -33,7 +33,8 @@ interface LocalResourcesTracker
boolean remove(LocalizedResource req, DeletionService delService); boolean remove(LocalizedResource req, DeletionService delService);
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath); Path getPathForLocalization(LocalResourceRequest req, Path localDirPath,
DeletionService delService);
String getUser(); String getUser();

View File

@ -392,10 +392,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
* @param {@link LocalResourceRequest} Resource localization request to * @param {@link LocalResourceRequest} Resource localization request to
* localize the resource. * localize the resource.
* @param {@link Path} local directory path * @param {@link Path} local directory path
* @param {@link DeletionService} Deletion Service to delete existing
* path for localization.
*/ */
@Override @Override
public Path public Path getPathForLocalization(LocalResourceRequest req,
getPathForLocalization(LocalResourceRequest req, Path localDirPath) { Path localDirPath, DeletionService delService) {
Path rPath = localDirPath; Path rPath = localDirPath;
if (useLocalCacheDirectoryManager && localDirPath != null) { if (useLocalCacheDirectoryManager && localDirPath != null) {
@ -415,8 +417,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
inProgressLocalResourcesMap.put(req, rPath); inProgressLocalResourcesMap.put(req, rPath);
} }
rPath = new Path(rPath, while (true) {
Long.toString(uniqueNumberGenerator.incrementAndGet())); Path uniquePath = new Path(rPath,
Long.toString(uniqueNumberGenerator.incrementAndGet()));
File file = new File(uniquePath.toUri().getRawPath());
if (!file.exists()) {
rPath = uniquePath;
break;
}
// If the directory already exists, delete it and move to next one.
LOG.warn("Directory " + uniquePath + " already exists, " +
"try next one.");
if (delService != null) {
delService.delete(getUser(), uniquePath);
}
}
Path localPath = new Path(rPath, req.getPath().getName()); Path localPath = new Path(rPath, req.getPath().getName());
LocalizedResource rsrc = localrsrc.get(req); LocalizedResource rsrc = localrsrc.get(req);
rsrc.setLocalPath(localPath); rsrc.setLocalPath(localPath);

View File

@ -798,7 +798,8 @@ public class ResourceLocalizationService extends CompositeService
+ ContainerLocalizer.FILECACHE, + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true); ContainerLocalizer.getEstimatedSize(resource), true);
Path publicDirDestPath = Path publicDirDestPath =
publicRsrc.getPathForLocalization(key, publicRootPath); publicRsrc.getPathForLocalization(key, publicRootPath,
delService);
if (!publicDirDestPath.getParent().equals(publicRootPath)) { if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath())); DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
} }
@ -1079,7 +1080,7 @@ public class ResourceLocalizationService extends CompositeService
dirsHandler.getLocalPathForWrite(cacheDirectory, dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false); ContainerLocalizer.getEstimatedSize(rsrc), false);
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
dirPath); dirPath, delService);
} }
@Override @Override

View File

@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -423,7 +424,7 @@ public class TestLocalResourcesTrackerImpl {
// Simulate the process of localization of lr1 // Simulate the process of localization of lr1
// NOTE: Localization path from tracker has resource ID at end // NOTE: Localization path from tracker has resource ID at end
Path hierarchicalPath1 = Path hierarchicalPath1 =
tracker.getPathForLocalization(lr1, localDir).getParent(); tracker.getPathForLocalization(lr1, localDir, null).getParent();
// Simulate lr1 getting localized // Simulate lr1 getting localized
ResourceLocalizedEvent rle1 = ResourceLocalizedEvent rle1 =
new ResourceLocalizedEvent(lr1, new ResourceLocalizedEvent(lr1,
@ -440,7 +441,7 @@ public class TestLocalResourcesTrackerImpl {
tracker.handle(reqEvent2); tracker.handle(reqEvent2);
Path hierarchicalPath2 = Path hierarchicalPath2 =
tracker.getPathForLocalization(lr2, localDir).getParent(); tracker.getPathForLocalization(lr2, localDir, null).getParent();
// localization failed. // localization failed.
ResourceFailedLocalizationEvent rfe2 = ResourceFailedLocalizationEvent rfe2 =
new ResourceFailedLocalizationEvent( new ResourceFailedLocalizationEvent(
@ -459,7 +460,7 @@ public class TestLocalResourcesTrackerImpl {
LocalResourceVisibility.PUBLIC, lc1); LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent3); tracker.handle(reqEvent3);
Path hierarchicalPath3 = Path hierarchicalPath3 =
tracker.getPathForLocalization(lr3, localDir).getParent(); tracker.getPathForLocalization(lr3, localDir, null).getParent();
// localization successful // localization successful
ResourceLocalizedEvent rle3 = ResourceLocalizedEvent rle3 =
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri() new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@ -538,7 +539,8 @@ public class TestLocalResourcesTrackerImpl {
dispatcher.await(); dispatcher.await();
// Simulate the process of localization of lr1 // Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
null);
ArgumentCaptor<LocalResourceProto> localResourceCaptor = ArgumentCaptor<LocalResourceProto> localResourceCaptor =
ArgumentCaptor.forClass(LocalResourceProto.class); ArgumentCaptor.forClass(LocalResourceProto.class);
@ -618,7 +620,8 @@ public class TestLocalResourcesTrackerImpl {
dispatcher.await(); dispatcher.await();
// Simulate the process of localization of lr1 // Simulate the process of localization of lr1
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir); Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
null);
ArgumentCaptor<LocalResourceProto> localResourceCaptor = ArgumentCaptor<LocalResourceProto> localResourceCaptor =
ArgumentCaptor.forClass(LocalResourceProto.class); ArgumentCaptor.forClass(LocalResourceProto.class);
@ -687,7 +690,8 @@ public class TestLocalResourcesTrackerImpl {
LocalResourceVisibility.APPLICATION, lc2); LocalResourceVisibility.APPLICATION, lc2);
tracker.handle(reqEvent2); tracker.handle(reqEvent2);
dispatcher.await(); dispatcher.await();
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir); Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir,
null);
long localizedId2 = Long.parseLong(hierarchicalPath2.getName()); long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
Assert.assertEquals(localizedId1 + 1, localizedId2); Assert.assertEquals(localizedId1 + 1, localizedId2);
} finally { } finally {
@ -781,6 +785,49 @@ public class TestLocalResourcesTrackerImpl {
} }
} }
@Test
@SuppressWarnings("unchecked")
public void testGetPathForLocalization() throws Exception {
FileContext lfs = FileContext.getLocalFSFileContext();
Path base_path = new Path("target",
TestLocalResourcesTrackerImpl.class.getSimpleName());
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(1, 1);
Configuration conf = new YarnConfiguration();
DrainDispatcher dispatcher = null;
dispatcher = createDispatcher(conf);
EventHandler<LocalizerEvent> localizerEventHandler =
mock(EventHandler.class);
EventHandler<LocalizerEvent> containerEventHandler =
mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
dispatcher.register(ContainerEventType.class, containerEventHandler);
NMStateStoreService stateStore = mock(NMStateStoreService.class);
DeletionService delService = mock(DeletionService.class);
try {
LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
LocalResourceVisibility.PUBLIC);
LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
localrsrc.put(req1, lr1);
LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, localrsrc, true, conf, stateStore);
Path conflictPath = new Path(base_path, "10");
Path qualifiedConflictPath = lfs.makeQualified(conflictPath);
lfs.mkdir(qualifiedConflictPath, null, true);
Path rPath = tracker.getPathForLocalization(req1, base_path,
delService);
Assert.assertFalse(lfs.util().exists(rPath));
verify(delService, times(1)).delete(eq(user), eq(conflictPath));
} finally {
lfs.delete(base_path, true);
if (dispatcher != null) {
dispatcher.stop();
}
}
}
private boolean createdummylocalizefile(Path path) { private boolean createdummylocalizefile(Path path) {
boolean ret = false; boolean ret = false;
File file = new File(path.toUri().getRawPath().toString()); File file = new File(path.toUri().getRawPath().toString());

View File

@ -615,29 +615,31 @@ public class TestResourceLocalizationService {
// Simulate start of localization for all resources // Simulate start of localization for all resources
privTracker1.getPathForLocalization(privReq1, privTracker1.getPathForLocalization(privReq1,
dirsHandler.getLocalPathForWrite( dirsHandler.getLocalPathForWrite(
ContainerLocalizer.USERCACHE + user1)); ContainerLocalizer.USERCACHE + user1), null);
privTracker1.getPathForLocalization(privReq2, privTracker1.getPathForLocalization(privReq2,
dirsHandler.getLocalPathForWrite( dirsHandler.getLocalPathForWrite(
ContainerLocalizer.USERCACHE + user1)); ContainerLocalizer.USERCACHE + user1), null);
LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1); LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2); LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
appTracker1.getPathForLocalization(appReq1, appTracker1.getPathForLocalization(appReq1,
dirsHandler.getLocalPathForWrite( dirsHandler.getLocalPathForWrite(
ContainerLocalizer.APPCACHE + appId1)); ContainerLocalizer.APPCACHE + appId1), null);
LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1); LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
appTracker2.getPathForLocalization(appReq2, appTracker2.getPathForLocalization(appReq2,
dirsHandler.getLocalPathForWrite( dirsHandler.getLocalPathForWrite(
ContainerLocalizer.APPCACHE + appId2)); ContainerLocalizer.APPCACHE + appId2), null);
LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2); LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
appTracker2.getPathForLocalization(appReq3, appTracker2.getPathForLocalization(appReq3,
dirsHandler.getLocalPathForWrite( dirsHandler.getLocalPathForWrite(
ContainerLocalizer.APPCACHE + appId2)); ContainerLocalizer.APPCACHE + appId2), null);
LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3); LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
pubTracker.getPathForLocalization(pubReq1, pubTracker.getPathForLocalization(pubReq1,
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
null);
LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1); LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
pubTracker.getPathForLocalization(pubReq2, pubTracker.getPathForLocalization(pubReq2,
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE)); dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
null);
LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2); LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
// Simulate completion of localization for most resources with // Simulate completion of localization for most resources with