diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a6585a6a2f2..b4b0b752955 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -491,6 +491,9 @@ Release 2.6.0 - UNRELEASED YARN-2617. Fixed NM to not send duplicate container status whose app is not running. (Jun Gong via jianhe) + YARN-2624. Resource Localization fails on a cluster due to existing cache + directories (Anubhav Dhoot via jlowe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index a092b59650b..c2dcebff715 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -222,7 +222,7 @@ public class ResourceLocalizationService extends CompositeService FileContext lfs = getLocalFileContext(conf); lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); - if (!stateStore.canRecover()) { + if (!stateStore.canRecover() || stateStore.isNewlyCreated()) { cleanUpLocalDir(lfs,delService); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 7c95fff9986..7cf4921157c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -118,6 +118,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; + private boolean isNewlyCreated; public NMLeveldbStateStoreService() { super(NMLeveldbStateStoreService.class.getName()); @@ -134,6 +135,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + @Override + public boolean isNewlyCreated() { + return isNewlyCreated; + } + @Override public List loadContainersState() @@ -837,6 +843,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); + isNewlyCreated = true; options.createIfMissing(true); try { db = JniDBFactory.factory.open(dbfile, options); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index a9699f370b4..b6ca336d18a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -211,6 +211,9 @@ public abstract class NMStateStoreService extends AbstractService { return true; } + public boolean isNewlyCreated() { + return false; + } /** * Load the state of applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index ed59ddd841c..fa5a4fcf5ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyShort; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -38,11 +39,14 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -58,6 +62,10 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.security.AccessControlException; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -227,6 +235,74 @@ public class TestResourceLocalizationService { } } + @Test + public void testDirectoryCleanupOnNewlyCreatedStateStore() + throws IOException, URISyntaxException { + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(new Configuration()); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = spy(new DeletionService(exec)); + delService.init(conf); + delService.start(); + + List localDirs = new ArrayList(); + String[] sDirs = new String[4]; + for (int i = 0; i < 4; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + + LocalDirsHandlerService diskhandler = new LocalDirsHandlerService(); + diskhandler.init(conf); + + NMStateStoreService nmStateStoreService = mock(NMStateStoreService.class); + when(nmStateStoreService.canRecover()).thenReturn(true); + when(nmStateStoreService.isNewlyCreated()).thenReturn(true); + + ResourceLocalizationService locService = + spy(new ResourceLocalizationService(dispatcher, exec, delService, + diskhandler, + nmStateStoreService)); + doReturn(lfs) + .when(locService).getLocalFileContext(isA(Configuration.class)); + try { + dispatcher.start(); + + // initialize ResourceLocalizationService + locService.init(conf); + + final FsPermission defaultPerm = new FsPermission((short)0755); + + // verify directory creation + for (Path p : localDirs) { + p = new Path((new URI(p.toString())).getPath()); + Path usercache = new Path(p, ContainerLocalizer.USERCACHE); + verify(spylfs) + .rename(eq(usercache), any(Path.class), any(Options.Rename.class)); + verify(spylfs) + .mkdir(eq(usercache), + eq(defaultPerm), eq(true)); + Path publicCache = new Path(p, ContainerLocalizer.FILECACHE); + verify(spylfs) + .rename(eq(usercache), any(Path.class), any(Options.Rename.class)); + verify(spylfs) + .mkdir(eq(publicCache), + eq(defaultPerm), eq(true)); + Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR); + verify(spylfs) + .rename(eq(usercache), any(Path.class), any(Options.Rename.class)); + verify(spylfs).mkdir(eq(nmPriv), + eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true)); + } + } finally { + dispatcher.stop(); + delService.stop(); + } + } + @Test @SuppressWarnings("unchecked") // mocked generics public void testResourceRelease() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index d2cc36323a3..db377f5f0c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.io.File; import java.io.IOException; @@ -123,6 +124,13 @@ public class TestNMLeveldbStateStoreService { assertTrue(state.getUserResources().isEmpty()); } + @Test + public void testIsNewlyCreated() throws IOException { + assertTrue(stateStore.isNewlyCreated()); + restartStateStore(); + assertFalse(stateStore.isNewlyCreated()); + } + @Test public void testEmptyState() throws IOException { assertTrue(stateStore.canRecover());