From 70575286b7566c86d441d1ed075786607986de99 Mon Sep 17 00:00:00 2001 From: Varun Vasudev Date: Mon, 7 Sep 2015 11:32:12 +0530 Subject: [PATCH] YARN-3591. Resource localization on a bad disk causes subsequent containers failure. Contributed by Lavkesh Lahngir. (cherry picked from commit 1dbd8e34a7d97c4d8586da79c980d8f2e0aad61d) --- hadoop-yarn-project/CHANGES.txt | 3 + .../localizer/LocalResourcesTrackerImpl.java | 64 ++++++++++++-- .../ResourceLocalizationService.java | 2 +- .../TestLocalResourcesTrackerImpl.java | 86 +++++++++++++++++-- .../localizer/TestResourceRetention.java | 2 +- 5 files changed, 138 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a021ca3dbd2..b9ad53a016c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -764,6 +764,9 @@ Release 2.8.0 - UNRELEASED YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat. (Hong Zhiguo via wangda) + YARN-3591. Resource localization on a bad disk causes subsequent containers failure. + (Lavkesh Lahngir via vvasudev) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index 7cf6b1572fe..a1e68173421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import java.io.File; import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; @@ -65,6 +67,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { private final Dispatcher dispatcher; private final ConcurrentMap localrsrc; private Configuration conf; + private LocalDirsHandlerService dirsHandler; /* * This flag controls whether this resource tracker uses hierarchical * directories or not. For PRIVATE and PUBLIC resource trackers it @@ -92,27 +95,38 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, Configuration conf, NMStateStoreService stateStore) { this(user, appId, dispatcher, - new ConcurrentHashMap(), - useLocalCacheDirectoryManager, conf, stateStore); + new ConcurrentHashMap(), + useLocalCacheDirectoryManager, conf, stateStore, null); + } + + public LocalResourcesTrackerImpl(String user, ApplicationId appId, + Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, + Configuration conf, NMStateStoreService stateStore, + LocalDirsHandlerService dirHandler) { + this(user, appId, dispatcher, + new ConcurrentHashMap(), + useLocalCacheDirectoryManager, conf, stateStore, dirHandler); } LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, - ConcurrentMap localrsrc, + ConcurrentMap localrsrc, boolean useLocalCacheDirectoryManager, Configuration conf, - NMStateStoreService stateStore) { + NMStateStoreService stateStore, LocalDirsHandlerService dirHandler) { this.appId = appId; this.user = user; this.dispatcher = dispatcher; this.localrsrc = localrsrc; this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager; - if ( this.useLocalCacheDirectoryManager) { - directoryManagers = new ConcurrentHashMap(); + if (this.useLocalCacheDirectoryManager) { + directoryManagers = + new ConcurrentHashMap(); inProgressLocalResourcesMap = - new ConcurrentHashMap(); + new ConcurrentHashMap(); } this.conf = conf; this.stateStore = stateStore; + this.dirsHandler = dirHandler; } /* @@ -312,11 +326,45 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { toString()); if (!file.exists()) { ret = false; + } else if (dirsHandler != null) { + ret = checkLocalResource(rsrc); } } return ret; } - + + /** + * Check if the rsrc is Localized on a good dir. + * + * @param rsrc + * @return + */ + @VisibleForTesting + boolean checkLocalResource(LocalizedResource rsrc) { + List localDirs = dirsHandler.getLocalDirsForRead(); + for (String dir : localDirs) { + if (isParent(rsrc.getLocalPath().toUri().getPath(), dir)) { + return true; + } else { + continue; + } + } + return false; + } + + /** + * @param path + * @param parentdir + * @return true if parentdir is parent of path else false. + */ + private boolean isParent(String path, String parentdir) { + // Add separator if not present. + if (path.charAt(path.length() - 1) != File.separatorChar) { + path += File.separator; + } + return path.startsWith(parentdir); + } + @Override public boolean remove(LocalizedResource rem, DeletionService delService) { // current synchronization guaranteed by crude RLS event for cleanup diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index b417c5e21c2..e239e345cb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -229,7 +229,7 @@ public class ResourceLocalizationService extends CompositeService public void serviceInit(Configuration conf) throws Exception { this.validateConf(conf); this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher, - true, conf, stateStore); + true, conf, stateStore, dirsHandler); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java index 569525422b2..350cecb7497 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java @@ -18,22 +18,22 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import static org.mockito.Mockito.any; -import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.timeout; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; @@ -64,8 +65,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; public class TestLocalResourcesTrackerImpl { @@ -103,7 +106,7 @@ public class TestLocalResourcesTrackerImpl { localrsrc.put(req2, lr2); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, - false, conf, new NMNullStateStoreService()); + false, conf, new NMNullStateStoreService(),null); ResourceEvent req11Event = new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); @@ -187,7 +190,7 @@ public class TestLocalResourcesTrackerImpl { localrsrc.put(req1, lr1); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, false, conf, - new NMNullStateStoreService()); + new NMNullStateStoreService(), null); ResourceEvent req11Event = new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); @@ -258,7 +261,7 @@ public class TestLocalResourcesTrackerImpl { new ConcurrentHashMap(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, - true, conf, new NMNullStateStoreService()); + true, conf, new NMNullStateStoreService(), null); LocalResourceRequest lr = createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC); @@ -405,7 +408,7 @@ public class TestLocalResourcesTrackerImpl { new ConcurrentHashMap(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, true, conf, - new NMNullStateStoreService()); + new NMNullStateStoreService(), null); // This is a random path. NO File creation will take place at this place. Path localDir = new Path("/tmp"); @@ -782,6 +785,71 @@ public class TestLocalResourcesTrackerImpl { } } + @SuppressWarnings("unchecked") + @Test + public void testResourcePresentInGoodDir() throws IOException { + String user = "testuser"; + DrainDispatcher dispatcher = null; + try { + Configuration conf = new Configuration(); + dispatcher = createDispatcher(conf); + EventHandler localizerEventHandler = + mock(EventHandler.class); + EventHandler containerEventHandler = + mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerEventHandler); + dispatcher.register(ContainerEventType.class, containerEventHandler); + + ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); + LocalizerContext lc1 = new LocalizerContext(user, cId1, null); + LocalResourceRequest req1 = + createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC); + LocalResourceRequest req2 = + createLocalResourceRequest(user, 2, 1, LocalResourceVisibility.PUBLIC); + LocalizedResource lr1 = createLocalizedResource(req1, dispatcher); + LocalizedResource lr2 = createLocalizedResource(req2, dispatcher); + ConcurrentMap localrsrc = + new ConcurrentHashMap(); + localrsrc.put(req1, lr1); + localrsrc.put(req2, lr2); + LocalDirsHandlerService dirsHandler = mock(LocalDirsHandlerService.class); + List goodDirs = new ArrayList(); + // /tmp/somedir2 is bad + goodDirs.add("/tmp/somedir1/"); + goodDirs.add("/tmp/somedir2"); + Mockito.when(dirsHandler.getLocalDirs()).thenReturn(goodDirs); + Mockito.when(dirsHandler.getLocalDirsForRead()).thenReturn(goodDirs); + LocalResourcesTrackerImpl tracker = + new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc, + true , conf, new NMNullStateStoreService(), dirsHandler); + ResourceEvent req11Event = + new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1); + ResourceEvent req21Event = + new ResourceRequestEvent(req2, LocalResourceVisibility.PUBLIC, lc1); + // Localize R1 for C1 + tracker.handle(req11Event); + // Localize R2 for C1 + tracker.handle(req21Event); + dispatcher.await(); + // Localize resource1 + Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1")); + Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2")); + ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1); + tracker.handle(rle1); + ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1); + tracker.handle(rle2); + dispatcher.await(); + // Remove somedir2 from gooddirs + Assert.assertTrue(tracker.checkLocalResource(lr2)); + goodDirs.remove(1); + Assert.assertFalse(tracker.checkLocalResource(lr2)); + } finally { + if (dispatcher != null) { + dispatcher.stop(); + } + } + } + private boolean createdummylocalizefile(Path path) { boolean ret = false; File file = new File(path.toUri().getRawPath().toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java index 0e3bf86bfb3..81e69e210f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java @@ -83,7 +83,7 @@ public class TestResourceRetention { ConcurrentMap trackerResources = new ConcurrentHashMap(); LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null, - null, trackerResources, false, conf, new NMNullStateStoreService())); + null, trackerResources, false, conf, new NMNullStateStoreService(),null)); for (int i = 0; i < nRsrcs; ++i) { final LocalResourceRequest req = new LocalResourceRequest( new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,