YARN-6641. Non-public resource localization on a bad disk causes subsequent containers failure. Contributed by Kuhu Shukla
This commit is contained in:
parent
47474fffac
commit
aea42930bb
|
@ -92,14 +92,6 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
|
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
|
||||||
private NMStateStoreService stateStore;
|
private NMStateStoreService stateStore;
|
||||||
|
|
||||||
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
|
|
||||||
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
|
|
||||||
Configuration conf, NMStateStoreService stateStore) {
|
|
||||||
this(user, appId, dispatcher,
|
|
||||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
|
|
||||||
useLocalCacheDirectoryManager, conf, stateStore, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
|
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
|
||||||
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
|
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
|
||||||
Configuration conf, NMStateStoreService stateStore,
|
Configuration conf, NMStateStoreService stateStore,
|
||||||
|
@ -528,4 +520,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
}
|
}
|
||||||
return mgr;
|
return mgr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
LocalDirsHandlerService getDirsHandler() {
|
||||||
|
return dirsHandler;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,7 +306,7 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
trackerState = userResources.getPrivateTrackerState();
|
trackerState = userResources.getPrivateTrackerState();
|
||||||
if (!trackerState.isEmpty()) {
|
if (!trackerState.isEmpty()) {
|
||||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||||
null, dispatcher, true, super.getConfig(), stateStore);
|
null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
|
||||||
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
|
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
|
||||||
tracker);
|
tracker);
|
||||||
if (oldTracker != null) {
|
if (oldTracker != null) {
|
||||||
|
@ -322,7 +322,8 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
ApplicationId appId = appEntry.getKey();
|
ApplicationId appId = appEntry.getKey();
|
||||||
String appIdStr = appId.toString();
|
String appIdStr = appId.toString();
|
||||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||||
appId, dispatcher, false, super.getConfig(), stateStore);
|
appId, dispatcher, false, super.getConfig(), stateStore,
|
||||||
|
dirsHandler);
|
||||||
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
|
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
|
||||||
tracker);
|
tracker);
|
||||||
if (oldTracker != null) {
|
if (oldTracker != null) {
|
||||||
|
@ -460,10 +461,11 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
// 0) Create application tracking structs
|
// 0) Create application tracking structs
|
||||||
String userName = app.getUser();
|
String userName = app.getUser();
|
||||||
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
|
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
|
||||||
null, dispatcher, true, super.getConfig(), stateStore));
|
null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
|
||||||
String appIdStr = app.getAppId().toString();
|
String appIdStr = app.getAppId().toString();
|
||||||
appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
|
appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
|
||||||
app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
|
app.getAppId(), dispatcher, false, super.getConfig(), stateStore,
|
||||||
|
dirsHandler));
|
||||||
// 1) Signal container init
|
// 1) Signal container init
|
||||||
//
|
//
|
||||||
// This is handled by the ApplicationImpl state machine and allows
|
// This is handled by the ApplicationImpl state machine and allows
|
||||||
|
|
|
@ -529,7 +529,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||||
appId, dispatcher, false, conf, stateStore);
|
appId, dispatcher, false, conf, stateStore, null);
|
||||||
// Container 1 needs lr1 resource
|
// Container 1 needs lr1 resource
|
||||||
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
||||||
|
@ -610,7 +610,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||||
appId, dispatcher, false, conf, stateStore);
|
appId, dispatcher, false, conf, stateStore, null);
|
||||||
// Container 1 needs lr1 resource
|
// Container 1 needs lr1 resource
|
||||||
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
||||||
|
@ -672,7 +672,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||||
appId, dispatcher, false, conf, stateStore);
|
appId, dispatcher, false, conf, stateStore, null);
|
||||||
// Container 1 needs lr1 resource
|
// Container 1 needs lr1 resource
|
||||||
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
||||||
|
@ -725,7 +725,7 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
|
LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
|
||||||
appId, dispatcher, true, conf, stateStore);
|
appId, dispatcher, true, conf, stateStore, null);
|
||||||
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
||||||
LocalResourceVisibility.PUBLIC);
|
LocalResourceVisibility.PUBLIC);
|
||||||
Assert.assertNull(tracker.getLocalizedResource(lr1));
|
Assert.assertNull(tracker.getLocalizedResource(lr1));
|
||||||
|
|
|
@ -2820,4 +2820,75 @@ public class TestResourceLocalizationService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testDirHandler() throws Exception {
|
||||||
|
File f = new File(basedir.toString());
|
||||||
|
String[] sDirs = new String[4];
|
||||||
|
List<Path> localDirs = new ArrayList<Path>(sDirs.length);
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
sDirs[i] = f.getAbsolutePath() + i;
|
||||||
|
localDirs.add(new Path(sDirs[i]));
|
||||||
|
}
|
||||||
|
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
|
||||||
|
LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
|
||||||
|
DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
dispatcher.init(conf);
|
||||||
|
dispatcher.start();
|
||||||
|
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ApplicationEventType.class, applicationBus);
|
||||||
|
EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
|
||||||
|
dispatcher.register(LocalizerEventType.class, localizerBus);
|
||||||
|
|
||||||
|
ContainerExecutor exec = mock(ContainerExecutor.class);
|
||||||
|
LocalDirsHandlerService mockDirsHandler =
|
||||||
|
mock(LocalDirsHandlerService.class);
|
||||||
|
doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
|
||||||
|
mockDirsHandler).getLocalDirsForCleanup();
|
||||||
|
// setup mocks
|
||||||
|
DeletionService delService = mock(DeletionService.class);
|
||||||
|
ResourceLocalizationService rawService =
|
||||||
|
new ResourceLocalizationService(dispatcher, exec, delService,
|
||||||
|
mockDirsHandler, nmContext, metrics);
|
||||||
|
ResourceLocalizationService spyService = spy(rawService);
|
||||||
|
doReturn(mockServer).when(spyService).createServer();
|
||||||
|
doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
|
||||||
|
isA(Configuration.class));
|
||||||
|
|
||||||
|
final String user = "user0";
|
||||||
|
// init application
|
||||||
|
final Application app = mock(Application.class);
|
||||||
|
final ApplicationId appId =
|
||||||
|
BuilderUtils.newApplicationId(314159265358979L, 3);
|
||||||
|
when(app.getUser()).thenReturn(user);
|
||||||
|
when(app.getAppId()).thenReturn(appId);
|
||||||
|
when(app.toString()).thenReturn(appId.toString());
|
||||||
|
try {
|
||||||
|
spyService.init(conf);
|
||||||
|
spyService.start();
|
||||||
|
|
||||||
|
spyService.handle(new ApplicationLocalizationEvent(
|
||||||
|
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
LocalResourcesTracker appTracker =
|
||||||
|
spyService.getLocalResourcesTracker(
|
||||||
|
LocalResourceVisibility.APPLICATION, user, appId);
|
||||||
|
LocalResourcesTracker privTracker =
|
||||||
|
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
|
||||||
|
user, appId);
|
||||||
|
LocalResourcesTracker pubTracker =
|
||||||
|
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
|
||||||
|
user, appId);
|
||||||
|
Assert.assertNotNull("dirHandler for appTracker is null!",
|
||||||
|
((LocalResourcesTrackerImpl)appTracker).getDirsHandler());
|
||||||
|
Assert.assertNotNull("dirHandler for privTracker is null!",
|
||||||
|
((LocalResourcesTrackerImpl)privTracker).getDirsHandler());
|
||||||
|
Assert.assertNotNull("dirHandler for pubTracker is null!",
|
||||||
|
((LocalResourcesTrackerImpl)pubTracker).getDirsHandler());
|
||||||
|
} finally {
|
||||||
|
dispatcher.stop();
|
||||||
|
delService.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue