YARN-3591. Resource localization on a bad disk causes subsequent containers failure. Contributed by Lavkesh Lahngir.
(cherry picked from commit 1dbd8e34a7
)
This commit is contained in:
parent
8bf5362014
commit
70575286b7
|
@ -764,6 +764,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
|
||||
(Hong Zhiguo via wangda)
|
||||
|
||||
YARN-3591. Resource localization on a bad disk causes subsequent containers failure.
|
||||
(Lavkesh Lahngir via vvasudev)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
|
||||
|
@ -65,6 +67,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
private final Dispatcher dispatcher;
|
||||
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
|
||||
private Configuration conf;
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
/*
|
||||
* This flag controls whether this resource tracker uses hierarchical
|
||||
* directories or not. For PRIVATE and PUBLIC resource trackers it
|
||||
|
@ -92,27 +95,38 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
|
||||
Configuration conf, NMStateStoreService stateStore) {
|
||||
this(user, appId, dispatcher,
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
|
||||
useLocalCacheDirectoryManager, conf, stateStore);
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
|
||||
useLocalCacheDirectoryManager, conf, stateStore, null);
|
||||
}
|
||||
|
||||
public LocalResourcesTrackerImpl(String user, ApplicationId appId,
|
||||
Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
|
||||
Configuration conf, NMStateStoreService stateStore,
|
||||
LocalDirsHandlerService dirHandler) {
|
||||
this(user, appId, dispatcher,
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
|
||||
useLocalCacheDirectoryManager, conf, stateStore, dirHandler);
|
||||
}
|
||||
|
||||
LocalResourcesTrackerImpl(String user, ApplicationId appId,
|
||||
Dispatcher dispatcher,
|
||||
ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc,
|
||||
boolean useLocalCacheDirectoryManager, Configuration conf,
|
||||
NMStateStoreService stateStore) {
|
||||
NMStateStoreService stateStore, LocalDirsHandlerService dirHandler) {
|
||||
this.appId = appId;
|
||||
this.user = user;
|
||||
this.dispatcher = dispatcher;
|
||||
this.localrsrc = localrsrc;
|
||||
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
|
||||
if ( this.useLocalCacheDirectoryManager) {
|
||||
directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
|
||||
if (this.useLocalCacheDirectoryManager) {
|
||||
directoryManagers =
|
||||
new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
|
||||
inProgressLocalResourcesMap =
|
||||
new ConcurrentHashMap<LocalResourceRequest, Path>();
|
||||
new ConcurrentHashMap<LocalResourceRequest, Path>();
|
||||
}
|
||||
this.conf = conf;
|
||||
this.stateStore = stateStore;
|
||||
this.dirsHandler = dirHandler;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -312,11 +326,45 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
toString());
|
||||
if (!file.exists()) {
|
||||
ret = false;
|
||||
} else if (dirsHandler != null) {
|
||||
ret = checkLocalResource(rsrc);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if the rsrc is Localized on a good dir.
|
||||
*
|
||||
* @param rsrc
|
||||
* @return
|
||||
*/
|
||||
@VisibleForTesting
|
||||
boolean checkLocalResource(LocalizedResource rsrc) {
|
||||
List<String> localDirs = dirsHandler.getLocalDirsForRead();
|
||||
for (String dir : localDirs) {
|
||||
if (isParent(rsrc.getLocalPath().toUri().getPath(), dir)) {
|
||||
return true;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param path
|
||||
* @param parentdir
|
||||
* @return true if parentdir is parent of path else false.
|
||||
*/
|
||||
private boolean isParent(String path, String parentdir) {
|
||||
// Add separator if not present.
|
||||
if (path.charAt(path.length() - 1) != File.separatorChar) {
|
||||
path += File.separator;
|
||||
}
|
||||
return path.startsWith(parentdir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(LocalizedResource rem, DeletionService delService) {
|
||||
// current synchronization guaranteed by crude RLS event for cleanup
|
||||
|
|
|
@ -229,7 +229,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
public void serviceInit(Configuration conf) throws Exception {
|
||||
this.validateConf(conf);
|
||||
this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
|
||||
true, conf, stateStore);
|
||||
true, conf, stateStore, dirsHandler);
|
||||
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||
|
||||
try {
|
||||
|
|
|
@ -18,22 +18,22 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||
|
@ -64,8 +65,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestLocalResourcesTrackerImpl {
|
||||
|
||||
|
@ -103,7 +106,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
localrsrc.put(req2, lr2);
|
||||
LocalResourcesTracker tracker =
|
||||
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
|
||||
false, conf, new NMNullStateStoreService());
|
||||
false, conf, new NMNullStateStoreService(),null);
|
||||
|
||||
ResourceEvent req11Event =
|
||||
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
|
||||
|
@ -187,7 +190,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
localrsrc.put(req1, lr1);
|
||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||
null, dispatcher, localrsrc, false, conf,
|
||||
new NMNullStateStoreService());
|
||||
new NMNullStateStoreService(), null);
|
||||
|
||||
ResourceEvent req11Event = new ResourceRequestEvent(req1,
|
||||
LocalResourceVisibility.PUBLIC, lc1);
|
||||
|
@ -258,7 +261,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
LocalResourcesTracker tracker =
|
||||
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
|
||||
true, conf, new NMNullStateStoreService());
|
||||
true, conf, new NMNullStateStoreService(), null);
|
||||
|
||||
LocalResourceRequest lr =
|
||||
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
|
||||
|
@ -405,7 +408,7 @@ public class TestLocalResourcesTrackerImpl {
|
|||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||
null, dispatcher, localrsrc, true, conf,
|
||||
new NMNullStateStoreService());
|
||||
new NMNullStateStoreService(), null);
|
||||
|
||||
// This is a random path. NO File creation will take place at this place.
|
||||
Path localDir = new Path("/tmp");
|
||||
|
@ -782,6 +785,71 @@ public class TestLocalResourcesTrackerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testResourcePresentInGoodDir() throws IOException {
|
||||
String user = "testuser";
|
||||
DrainDispatcher dispatcher = null;
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
dispatcher = createDispatcher(conf);
|
||||
EventHandler<LocalizerEvent> localizerEventHandler =
|
||||
mock(EventHandler.class);
|
||||
EventHandler<LocalizerEvent> containerEventHandler =
|
||||
mock(EventHandler.class);
|
||||
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
||||
dispatcher.register(ContainerEventType.class, containerEventHandler);
|
||||
|
||||
ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
|
||||
LocalResourceRequest req1 =
|
||||
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
|
||||
LocalResourceRequest req2 =
|
||||
createLocalResourceRequest(user, 2, 1, LocalResourceVisibility.PUBLIC);
|
||||
LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
|
||||
LocalizedResource lr2 = createLocalizedResource(req2, dispatcher);
|
||||
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
|
||||
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
||||
localrsrc.put(req1, lr1);
|
||||
localrsrc.put(req2, lr2);
|
||||
LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class);
|
||||
List<String> goodDirs = new ArrayList<String>();
|
||||
// /tmp/somedir2 is bad
|
||||
goodDirs.add("/tmp/somedir1/");
|
||||
goodDirs.add("/tmp/somedir2");
|
||||
Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs);
|
||||
Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs);
|
||||
LocalResourcesTrackerImpl tracker =
|
||||
new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
|
||||
true , conf, new NMNullStateStoreService(), dirsHandler);
|
||||
ResourceEvent req11Event =
|
||||
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
|
||||
ResourceEvent req21Event =
|
||||
new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1);
|
||||
// Localize R1 for C1
|
||||
tracker.handle(req11Event);
|
||||
// Localize R2 for C1
|
||||
tracker.handle(req21Event);
|
||||
dispatcher.await();
|
||||
// Localize resource1
|
||||
Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"));
|
||||
Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2"));
|
||||
ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1);
|
||||
tracker.handle(rle1);
|
||||
ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
|
||||
tracker.handle(rle2);
|
||||
dispatcher.await();
|
||||
// Remove somedir2 from gooddirs
|
||||
Assert.assertTrue(tracker.checkLocalResource(lr2));
|
||||
goodDirs.remove(1);
|
||||
Assert.assertFalse(tracker.checkLocalResource(lr2));
|
||||
} finally {
|
||||
if (dispatcher != null) {
|
||||
dispatcher.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean createdummylocalizefile(Path path) {
|
||||
boolean ret = false;
|
||||
File file = new File(path.toUri().getRawPath().toString());
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TestResourceRetention {
|
|||
ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
|
||||
new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
|
||||
LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
|
||||
null, trackerResources, false, conf, new NMNullStateStoreService()));
|
||||
null, trackerResources, false, conf, new NMNullStateStoreService(),null));
|
||||
for (int i = 0; i < nRsrcs; ++i) {
|
||||
final LocalResourceRequest req = new LocalResourceRequest(
|
||||
new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
|
||||
|
|
Loading…
Reference in New Issue