YARN-3727. For better error recovery, check if the directory exists before using it for localization. Contributed by Zhihai Xu
(cherry picked from commit 854d25b0c3
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
This commit is contained in:
parent
7c9a368b45
commit
493f072008
|
@ -88,6 +88,9 @@ Release 2.7.2 - UNRELEASED
|
||||||
YARN-4180. AMLauncher does not retry on failures when talking to NM.
|
YARN-4180. AMLauncher does not retry on failures when talking to NM.
|
||||||
(adhoot)
|
(adhoot)
|
||||||
|
|
||||||
|
YARN-3727. For better error recovery, check if the directory exists before
|
||||||
|
using it for localization. (Zhihai Xu via jlowe)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -33,7 +33,8 @@ interface LocalResourcesTracker
|
||||||
|
|
||||||
boolean remove(LocalizedResource req, DeletionService delService);
|
boolean remove(LocalizedResource req, DeletionService delService);
|
||||||
|
|
||||||
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
|
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath,
|
||||||
|
DeletionService delService);
|
||||||
|
|
||||||
String getUser();
|
String getUser();
|
||||||
|
|
||||||
|
|
|
@ -392,10 +392,12 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
* @param {@link LocalResourceRequest} Resource localization request to
|
* @param {@link LocalResourceRequest} Resource localization request to
|
||||||
* localize the resource.
|
* localize the resource.
|
||||||
* @param {@link Path} local directory path
|
* @param {@link Path} local directory path
|
||||||
|
* @param {@link DeletionService} Deletion Service to delete existing
|
||||||
|
* path for localization.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Path
|
public Path getPathForLocalization(LocalResourceRequest req,
|
||||||
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
|
Path localDirPath, DeletionService delService) {
|
||||||
Path rPath = localDirPath;
|
Path rPath = localDirPath;
|
||||||
if (useLocalCacheDirectoryManager && localDirPath != null) {
|
if (useLocalCacheDirectoryManager && localDirPath != null) {
|
||||||
|
|
||||||
|
@ -415,8 +417,22 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
inProgressLocalResourcesMap.put(req, rPath);
|
inProgressLocalResourcesMap.put(req, rPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
rPath = new Path(rPath,
|
while (true) {
|
||||||
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
Path uniquePath = new Path(rPath,
|
||||||
|
Long.toString(uniqueNumberGenerator.incrementAndGet()));
|
||||||
|
File file = new File(uniquePath.toUri().getRawPath());
|
||||||
|
if (!file.exists()) {
|
||||||
|
rPath = uniquePath;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// If the directory already exists, delete it and move to next one.
|
||||||
|
LOG.warn("Directory " + uniquePath + " already exists, " +
|
||||||
|
"try next one.");
|
||||||
|
if (delService != null) {
|
||||||
|
delService.delete(getUser(), uniquePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Path localPath = new Path(rPath, req.getPath().getName());
|
Path localPath = new Path(rPath, req.getPath().getName());
|
||||||
LocalizedResource rsrc = localrsrc.get(req);
|
LocalizedResource rsrc = localrsrc.get(req);
|
||||||
rsrc.setLocalPath(localPath);
|
rsrc.setLocalPath(localPath);
|
||||||
|
|
|
@ -799,7 +799,8 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
+ ContainerLocalizer.FILECACHE,
|
+ ContainerLocalizer.FILECACHE,
|
||||||
ContainerLocalizer.getEstimatedSize(resource), true);
|
ContainerLocalizer.getEstimatedSize(resource), true);
|
||||||
Path publicDirDestPath =
|
Path publicDirDestPath =
|
||||||
publicRsrc.getPathForLocalization(key, publicRootPath);
|
publicRsrc.getPathForLocalization(key, publicRootPath,
|
||||||
|
delService);
|
||||||
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
|
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
|
||||||
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
|
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
|
||||||
}
|
}
|
||||||
|
@ -1086,7 +1087,7 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
dirsHandler.getLocalPathForWrite(cacheDirectory,
|
dirsHandler.getLocalPathForWrite(cacheDirectory,
|
||||||
ContainerLocalizer.getEstimatedSize(rsrc), false);
|
ContainerLocalizer.getEstimatedSize(rsrc), false);
|
||||||
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
|
return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
|
||||||
dirPath);
|
dirPath, delService);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -424,7 +425,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
// Simulate the process of localization of lr1
|
// Simulate the process of localization of lr1
|
||||||
// NOTE: Localization path from tracker has resource ID at end
|
// NOTE: Localization path from tracker has resource ID at end
|
||||||
Path hierarchicalPath1 =
|
Path hierarchicalPath1 =
|
||||||
tracker.getPathForLocalization(lr1, localDir).getParent();
|
tracker.getPathForLocalization(lr1, localDir, null).getParent();
|
||||||
// Simulate lr1 getting localized
|
// Simulate lr1 getting localized
|
||||||
ResourceLocalizedEvent rle1 =
|
ResourceLocalizedEvent rle1 =
|
||||||
new ResourceLocalizedEvent(lr1,
|
new ResourceLocalizedEvent(lr1,
|
||||||
|
@ -441,7 +442,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
tracker.handle(reqEvent2);
|
tracker.handle(reqEvent2);
|
||||||
|
|
||||||
Path hierarchicalPath2 =
|
Path hierarchicalPath2 =
|
||||||
tracker.getPathForLocalization(lr2, localDir).getParent();
|
tracker.getPathForLocalization(lr2, localDir, null).getParent();
|
||||||
// localization failed.
|
// localization failed.
|
||||||
ResourceFailedLocalizationEvent rfe2 =
|
ResourceFailedLocalizationEvent rfe2 =
|
||||||
new ResourceFailedLocalizationEvent(
|
new ResourceFailedLocalizationEvent(
|
||||||
|
@ -460,7 +461,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
LocalResourceVisibility.PUBLIC, lc1);
|
LocalResourceVisibility.PUBLIC, lc1);
|
||||||
tracker.handle(reqEvent3);
|
tracker.handle(reqEvent3);
|
||||||
Path hierarchicalPath3 =
|
Path hierarchicalPath3 =
|
||||||
tracker.getPathForLocalization(lr3, localDir).getParent();
|
tracker.getPathForLocalization(lr3, localDir, null).getParent();
|
||||||
// localization successful
|
// localization successful
|
||||||
ResourceLocalizedEvent rle3 =
|
ResourceLocalizedEvent rle3 =
|
||||||
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
|
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
|
||||||
|
@ -539,7 +540,8 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
// Simulate the process of localization of lr1
|
// Simulate the process of localization of lr1
|
||||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
|
||||||
|
null);
|
||||||
|
|
||||||
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
||||||
ArgumentCaptor.forClass(LocalResourceProto.class);
|
ArgumentCaptor.forClass(LocalResourceProto.class);
|
||||||
|
@ -619,7 +621,8 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
// Simulate the process of localization of lr1
|
// Simulate the process of localization of lr1
|
||||||
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
|
||||||
|
null);
|
||||||
|
|
||||||
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
||||||
ArgumentCaptor.forClass(LocalResourceProto.class);
|
ArgumentCaptor.forClass(LocalResourceProto.class);
|
||||||
|
@ -688,7 +691,8 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
LocalResourceVisibility.APPLICATION, lc2);
|
LocalResourceVisibility.APPLICATION, lc2);
|
||||||
tracker.handle(reqEvent2);
|
tracker.handle(reqEvent2);
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
|
Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir,
|
||||||
|
null);
|
||||||
long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
|
long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
|
||||||
Assert.assertEquals(localizedId1 + 1, localizedId2);
|
Assert.assertEquals(localizedId1 + 1, localizedId2);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -782,6 +786,49 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testGetPathForLocalization() throws Exception {
|
||||||
|
FileContext lfs = FileContext.getLocalFSFileContext();
|
||||||
|
Path base_path = new Path("target",
|
||||||
|
TestLocalResourcesTrackerImpl.class.getSimpleName());
|
||||||
|
final String user = "someuser";
|
||||||
|
final ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
DrainDispatcher dispatcher = null;
|
||||||
|
dispatcher = createDispatcher(conf);
|
||||||
|
EventHandler<LocalizerEvent> localizerEventHandler =
|
||||||
|
mock(EventHandler.class);
|
||||||
|
EventHandler<LocalizerEvent> containerEventHandler =
|
||||||
|
mock(EventHandler.class);
|
||||||
|
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
||||||
|
dispatcher.register(ContainerEventType.class, containerEventHandler);
|
||||||
|
NMStateStoreService stateStore = mock(NMStateStoreService.class);
|
||||||
|
DeletionService delService = mock(DeletionService.class);
|
||||||
|
try {
|
||||||
|
LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
|
||||||
|
LocalResourceVisibility.PUBLIC);
|
||||||
|
LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
|
||||||
|
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
|
||||||
|
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||||
|
localrsrc.put(req1, lr1);
|
||||||
|
LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
|
||||||
|
appId, dispatcher, localrsrc, true, conf, stateStore);
|
||||||
|
Path conflictPath = new Path(base_path, "10");
|
||||||
|
Path qualifiedConflictPath = lfs.makeQualified(conflictPath);
|
||||||
|
lfs.mkdir(qualifiedConflictPath, null, true);
|
||||||
|
Path rPath = tracker.getPathForLocalization(req1, base_path,
|
||||||
|
delService);
|
||||||
|
Assert.assertFalse(lfs.util().exists(rPath));
|
||||||
|
verify(delService, times(1)).delete(eq(user), eq(conflictPath));
|
||||||
|
} finally {
|
||||||
|
lfs.delete(base_path, true);
|
||||||
|
if (dispatcher != null) {
|
||||||
|
dispatcher.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean createdummylocalizefile(Path path) {
|
private boolean createdummylocalizefile(Path path) {
|
||||||
boolean ret = false;
|
boolean ret = false;
|
||||||
File file = new File(path.toUri().getRawPath().toString());
|
File file = new File(path.toUri().getRawPath().toString());
|
||||||
|
|
|
@ -619,29 +619,31 @@ public class TestResourceLocalizationService {
|
||||||
// Simulate start of localization for all resources
|
// Simulate start of localization for all resources
|
||||||
privTracker1.getPathForLocalization(privReq1,
|
privTracker1.getPathForLocalization(privReq1,
|
||||||
dirsHandler.getLocalPathForWrite(
|
dirsHandler.getLocalPathForWrite(
|
||||||
ContainerLocalizer.USERCACHE + user1));
|
ContainerLocalizer.USERCACHE + user1), null);
|
||||||
privTracker1.getPathForLocalization(privReq2,
|
privTracker1.getPathForLocalization(privReq2,
|
||||||
dirsHandler.getLocalPathForWrite(
|
dirsHandler.getLocalPathForWrite(
|
||||||
ContainerLocalizer.USERCACHE + user1));
|
ContainerLocalizer.USERCACHE + user1), null);
|
||||||
LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
|
LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
|
||||||
LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
|
LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
|
||||||
appTracker1.getPathForLocalization(appReq1,
|
appTracker1.getPathForLocalization(appReq1,
|
||||||
dirsHandler.getLocalPathForWrite(
|
dirsHandler.getLocalPathForWrite(
|
||||||
ContainerLocalizer.APPCACHE + appId1));
|
ContainerLocalizer.APPCACHE + appId1), null);
|
||||||
LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
|
LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
|
||||||
appTracker2.getPathForLocalization(appReq2,
|
appTracker2.getPathForLocalization(appReq2,
|
||||||
dirsHandler.getLocalPathForWrite(
|
dirsHandler.getLocalPathForWrite(
|
||||||
ContainerLocalizer.APPCACHE + appId2));
|
ContainerLocalizer.APPCACHE + appId2), null);
|
||||||
LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
|
LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
|
||||||
appTracker2.getPathForLocalization(appReq3,
|
appTracker2.getPathForLocalization(appReq3,
|
||||||
dirsHandler.getLocalPathForWrite(
|
dirsHandler.getLocalPathForWrite(
|
||||||
ContainerLocalizer.APPCACHE + appId2));
|
ContainerLocalizer.APPCACHE + appId2), null);
|
||||||
LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
|
LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
|
||||||
pubTracker.getPathForLocalization(pubReq1,
|
pubTracker.getPathForLocalization(pubReq1,
|
||||||
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
|
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
|
||||||
|
null);
|
||||||
LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
|
LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
|
||||||
pubTracker.getPathForLocalization(pubReq2,
|
pubTracker.getPathForLocalization(pubReq2,
|
||||||
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
|
dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
|
||||||
|
null);
|
||||||
LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
|
LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
|
||||||
|
|
||||||
// Simulate completion of localization for most resources with
|
// Simulate completion of localization for most resources with
|
||||||
|
|
Loading…
Reference in New Issue