YARN-3727. For better error recovery, check if the directory exists before using it for localization. Contributed by Zhihai Xu
(cherry picked from commit 854d25b0c3
)
This commit is contained in:
parent
3625222635
commit
d645ee1d62
|
@ -953,6 +953,9 @@ Release 2.7.2 - UNRELEASED
|
|||
YARN-4180. AMLauncher does not retry on failures when talking to NM.
|
||||
(adhoot)
|
||||
|
||||
YARN-3727. For better error recovery, check if the directory exists before
|
||||
using it for localization. (Zhihai Xu via jlowe)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -33,7 +33,8 @@ interface LocalResourcesTracker
|
|||
|
||||
boolean remove(LocalizedResource req, DeletionService delService);
|
||||
|
||||
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
|
||||
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath,
|
||||
DeletionService delService);
|
||||
|
||||
String getUser();
|
||||
|
||||
|
|
|
@ -440,10 +440,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
* @param {@link LocalResourceRequest} Resource localization request to
|
||||
* localize the resource.
|
||||
* @param {@link Path} local directory path
|
||||
* @param {@link DeletionService} Deletion Service to delete existing
|
||||
* path for localization.
|
||||
*/
|
||||
@Override
|
||||
public Path
|
||||
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
|
||||
public Path getPathForLocalization(LocalResourceRequest req,
|
||||
Path localDirPath, DeletionService delService) {
|
||||
Path rPath = localDirPath;
|
||||
if (useLocalCacheDirectoryManager && localDirPath != null) {
|
||||
|
||||
|
@ -463,8 +465,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
inProgressLocalResourcesMap.put(req, rPath);
|
||||
}
|
||||
|
||||
rPath = new Path(rPath,
|
||||
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||
while (true) {
|
||||
Path uniquePath = new Path(rPath,
|
||||
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||
File file = new File(uniquePath.toUri().getRawPath());
|
||||
if (!file.exists()) {
|
||||
rPath = uniquePath;
|
||||
break;
|
||||
}
|
||||
// If the directory already exists, delete it and move to next one.
|
||||
LOG.warn("Directory " + uniquePath + " already exists, " +
|
||||
"try next one.");
|
||||
if (delService != null) {
|
||||
delService.delete(getUser(), uniquePath);
|
||||
}
|
||||
}
|
||||
|
||||
Path localPath = new Path(rPath, req.getPath().getName());
|
||||
LocalizedResource rsrc = localrsrc.get(req);
|
||||
rsrc.setLocalPath(localPath);
|
||||
|
|
|
@ -830,7 +830,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
+ ContainerLocalizer.FILECACHE,
|
||||
ContainerLocalizer.getEstimatedSize(resource), true);
|
||||
Path publicDirDestPath =
|
||||
publicRsrc.getPathForLocalization(key, publicRootPath);
|
||||
publicRsrc.getPathForLocalization(key, publicRootPath,
|
||||
delService);
|
||||
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
|
||||
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
|
||||
}
|
||||
|
@ -1116,7 +1117,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
dirsHandler.getLocalPathForWrite(cacheDirectory,
|
||||
ContainerLocalizer.getEstimatedSize(rsrc), false);
|
||||
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
|
||||
dirPath);
|
||||
dirPath, delService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -427,7 +428,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
// Simulate the process of localization of lr1
|
||||
// NOTE: Localization path from tracker has resource ID at end
|
||||
Path hierarchicalPath1 =
|
||||
tracker.getPathForLocalization(lr1, localDir).getParent();
|
||||
tracker.getPathForLocalization(lr1, localDir, null).getParent();
|
||||
// Simulate lr1 getting localized
|
||||
ResourceLocalizedEvent rle1 =
|
||||
new ResourceLocalizedEvent(lr1,
|
||||
|
@ -444,7 +445,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
tracker.handle(reqEvent2);
|
||||
|
||||
Path hierarchicalPath2 =
|
||||
tracker.getPathForLocalization(lr2, localDir).getParent();
|
||||
tracker.getPathForLocalization(lr2, localDir, null).getParent();
|
||||
// localization failed.
|
||||
ResourceFailedLocalizationEvent rfe2 =
|
||||
new ResourceFailedLocalizationEvent(
|
||||
|
@ -463,7 +464,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
LocalResourceVisibility.PUBLIC, lc1);
|
||||
tracker.handle(reqEvent3);
|
||||
Path hierarchicalPath3 =
|
||||
tracker.getPathForLocalization(lr3, localDir).getParent();
|
||||
tracker.getPathForLocalization(lr3, localDir, null).getParent();
|
||||
// localization successful
|
||||
ResourceLocalizedEvent rle3 =
|
||||
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
|
||||
|
@ -542,7 +543,8 @@ public class TestLocalResourcesTrackerImpl {
|
|||
dispatcher.await();
|
||||
|
||||
// Simulate the process of localization of lr1
|
||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
|
||||
null);
|
||||
|
||||
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
||||
ArgumentCaptor.forClass(LocalResourceProto.class);
|
||||
|
@ -622,7 +624,8 @@ public class TestLocalResourcesTrackerImpl {
|
|||
dispatcher.await();
|
||||
|
||||
// Simulate the process of localization of lr1
|
||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
|
||||
null);
|
||||
|
||||
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
||||
ArgumentCaptor.forClass(LocalResourceProto.class);
|
||||
|
@ -691,7 +694,8 @@ public class TestLocalResourcesTrackerImpl {
|
|||
LocalResourceVisibility.APPLICATION, lc2);
|
||||
tracker.handle(reqEvent2);
|
||||
dispatcher.await();
|
||||
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
|
||||
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir,
|
||||
null);
|
||||
long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
|
||||
Assert.assertEquals(localizedId1 + 1, localizedId2);
|
||||
} finally {
|
||||
|
@ -785,6 +789,49 @@ public class TestLocalResourcesTrackerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetPathForLocalization() throws Exception {
|
||||
FileContext lfs = FileContext.getLocalFSFileContext();
|
||||
Path base_path = new Path("target",
|
||||
TestLocalResourcesTrackerImpl.class.getSimpleName());
|
||||
final String user = "someuser";
|
||||
final ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
DrainDispatcher dispatcher = null;
|
||||
dispatcher = createDispatcher(conf);
|
||||
EventHandler<LocalizerEvent> localizerEventHandler =
|
||||
mock(EventHandler.class);
|
||||
EventHandler<LocalizerEvent> containerEventHandler =
|
||||
mock(EventHandler.class);
|
||||
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
||||
dispatcher.register(ContainerEventType.class, containerEventHandler);
|
||||
NMStateStoreService stateStore = mock(NMStateStoreService.class);
|
||||
DeletionService delService = mock(DeletionService.class);
|
||||
try {
|
||||
LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
|
||||
LocalResourceVisibility.PUBLIC);
|
||||
LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
localrsrc.put(req1, lr1);
|
||||
LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
|
||||
appId, dispatcher, localrsrc, true, conf, stateStore, null);
|
||||
Path conflictPath = new Path(base_path, "10");
|
||||
Path qualifiedConflictPath = lfs.makeQualified(conflictPath);
|
||||
lfs.mkdir(qualifiedConflictPath, null, true);
|
||||
Path rPath = tracker.getPathForLocalization(req1, base_path,
|
||||
delService);
|
||||
Assert.assertFalse(lfs.util().exists(rPath));
|
||||
verify(delService, times(1)).delete(eq(user), eq(conflictPath));
|
||||
} finally {
|
||||
lfs.delete(base_path, true);
|
||||
if (dispatcher != null) {
|
||||
dispatcher.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testResourcePresentInGoodDir() throws IOException {
|
||||
|
@ -832,8 +879,10 @@ public class TestLocalResourcesTrackerImpl {
|
|||
tracker.handle(req21Event);
|
||||
dispatcher.await();
|
||||
// Localize resource1
|
||||
Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"));
|
||||
Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2"));
|
||||
Path p1 = tracker.getPathForLocalization(req1,
|
||||
new Path("/tmp/somedir1"), null);
|
||||
Path p2 = tracker.getPathForLocalization(req2,
|
||||
new Path("/tmp/somedir2"), null);
|
||||
ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1);
|
||||
tracker.handle(rle1);
|
||||
ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
|
||||
|
|
|
@ -624,29 +624,31 @@ public class TestResourceLocalizationService {
|
|||
// Simulate start of localization for all resources
|
||||
privTracker1.getPathForLocalization(privReq1,
|
||||
dirsHandler.getLocalPathForWrite(
|
||||
ContainerLocalizer.USERCACHE + user1));
|
||||
ContainerLocalizer.USERCACHE + user1), null);
|
||||
privTracker1.getPathForLocalization(privReq2,
|
||||
dirsHandler.getLocalPathForWrite(
|
||||
ContainerLocalizer.USERCACHE + user1));
|
||||
ContainerLocalizer.USERCACHE + user1), null);
|
||||
LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
|
||||
LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
|
||||
appTracker1.getPathForLocalization(appReq1,
|
||||
dirsHandler.getLocalPathForWrite(
|
||||
ContainerLocalizer.APPCACHE + appId1));
|
||||
ContainerLocalizer.APPCACHE + appId1), null);
|
||||
LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
|
||||
appTracker2.getPathForLocalization(appReq2,
|
||||
dirsHandler.getLocalPathForWrite(
|
||||
ContainerLocalizer.APPCACHE + appId2));
|
||||
ContainerLocalizer.APPCACHE + appId2), null);
|
||||
LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
|
||||
appTracker2.getPathForLocalization(appReq3,
|
||||
dirsHandler.getLocalPathForWrite(
|
||||
ContainerLocalizer.APPCACHE + appId2));
|
||||
ContainerLocalizer.APPCACHE + appId2), null);
|
||||
LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
|
||||
pubTracker.getPathForLocalization(pubReq1,
|
||||
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
|
||||
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
|
||||
null);
|
||||
LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
|
||||
pubTracker.getPathForLocalization(pubReq2,
|
||||
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
|
||||
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
|
||||
null);
|
||||
LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
|
||||
|
||||
// Simulate completion of localization for most resources with
|
||||
|
|
Loading…
Reference in New Issue