From 78063b3a76840ffade33667a90f9c9dbe7fc99eb Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 4 Jun 2013 20:49:34 +0000 Subject: [PATCH] YARN-742. Log aggregation causes a lot of redundant setPermission calls. Contributed by Jason Lowe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489596 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../logaggregation/LogAggregationService.java | 113 ++++++++++-------- .../TestLogAggregationService.java | 62 +++++++++- 3 files changed, 125 insertions(+), 53 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b0d0fa7f41c..e2845cf2705 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -433,6 +433,9 @@ Release 2.1.0-beta - UNRELEASED YARN-757. Changed TestRMRestart to use the default scheduler to avoid test failures. (Bikas Saha via vinodkv) + YARN-742. Log aggregation causes a lot of redundant setPermission calls. + (jlowe via kihwal) + BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 0170080b566..b477ade6aa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -162,12 +163,16 @@ public class LogAggregationService extends AbstractService implements LOG.warn("Some logs may not have been aggregated for " + appId); } } - + + protected FileSystem getFileSystem(Configuration conf) throws IOException { + return FileSystem.get(conf); + } + void verifyAndCreateRemoteLogDir(Configuration conf) { // Checking the existence of the TLD FileSystem remoteFS = null; try { - remoteFS = FileSystem.get(conf); + remoteFS = getFileSystem(conf); } catch (IOException e) { throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e); } @@ -212,8 +217,26 @@ public class LogAggregationService extends AbstractService implements private void createDir(FileSystem fs, Path path, FsPermission fsPerm) throws IOException { - fs.mkdirs(path, new FsPermission(fsPerm)); - fs.setPermission(path, new FsPermission(fsPerm)); + FsPermission dirPerm = new FsPermission(fsPerm); + fs.mkdirs(path, dirPerm); + FsPermission umask = FsPermission.getUMask(fs.getConf()); + if (!dirPerm.equals(dirPerm.applyUMask(umask))) { + fs.setPermission(path, new FsPermission(fsPerm)); + } + } + + private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) + throws IOException { + boolean exists = true; + try { + FileStatus appDirStatus = fs.getFileStatus(path); + if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) { + fs.setPermission(path, APP_DIR_PERMISSIONS); + } + } catch (FileNotFoundException fnfe) { + exists = false; + } + return exists; } protected void createAppDir(final String user, final ApplicationId appId, @@ -222,57 +245,43 @@ public class LogAggregationService extends AbstractService implements userUgi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - // TODO: Reuse FS for user? - FileSystem remoteFS = null; - Path userDir = null; - Path suffixDir = null; - Path appDir = null; try { - remoteFS = FileSystem.get(getConfig()); - } catch (IOException e) { - LOG.error("Failed to get remote FileSystem while processing app " - + appId, e); - throw e; - } - try { - userDir = - LogAggregationUtils.getRemoteLogUserDir( + // TODO: Reuse FS for user? + FileSystem remoteFS = getFileSystem(getConfig()); + + // Only creating directories if they are missing to avoid + // unnecessary load on the filesystem from all of the nodes + Path appDir = LogAggregationUtils.getRemoteAppLogDir( + LogAggregationService.this.remoteRootLogDir, appId, user, + LogAggregationService.this.remoteRootLogDirSuffix); + appDir = appDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) { + Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir( + LogAggregationService.this.remoteRootLogDir, user, + LogAggregationService.this.remoteRootLogDirSuffix); + suffixDir = suffixDir.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory()); + + if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) { + Path userDir = LogAggregationUtils.getRemoteLogUserDir( LogAggregationService.this.remoteRootLogDir, user); - userDir = - userDir.makeQualified(remoteFS.getUri(), + userDir = userDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()); - createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + + if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) { + createDir(remoteFS, userDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); + } + + createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); + } } catch (IOException e) { - LOG.error("Failed to create user dir [" + userDir - + "] while processing app " + appId); - throw e; - } - try { - suffixDir = - LogAggregationUtils.getRemoteLogSuffixedDir( - LogAggregationService.this.remoteRootLogDir, user, - LogAggregationService.this.remoteRootLogDirSuffix); - suffixDir = - suffixDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS); - } catch (IOException e) { - LOG.error("Failed to create suffixed user dir [" + suffixDir - + "] while processing app " + appId); - throw e; - } - try { - appDir = - LogAggregationUtils.getRemoteAppLogDir( - LogAggregationService.this.remoteRootLogDir, appId, user, - LogAggregationService.this.remoteRootLogDirSuffix); - appDir = - appDir.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory()); - createDir(remoteFS, appDir, APP_DIR_PERMISSIONS); - } catch (IOException e) { - LOG.error("Failed to create application log dir [" + appDir - + "] while processing app " + appId); + LOG.error("Failed to setup application log directory for " + + appId, e); throw e; } return null; @@ -294,7 +303,7 @@ public class LogAggregationService extends AbstractService implements eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); } catch (YarnRuntimeException e) { - LOG.warn("Application failed to init aggregation: " + e.getMessage()); + LOG.warn("Application failed to init aggregation", e); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 36e196008a4..dd477039431 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -55,8 +56,10 @@ import junit.framework.Assert; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.YarnRuntimeException; @@ -78,6 +81,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -506,7 +510,63 @@ public class TestLogAggregationService extends BaseContainerManagerTest { assertTrue("The new aggregate file is not successfully created", existsAfter); aNewFile.delete(); //housekeeping } - + + @Test + public void testAppLogDirCreation() throws Exception { + final String logSuffix = "logs"; + this.conf.set(YarnConfiguration.NM_LOG_DIRS, + localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix); + + InlineDispatcher dispatcher = new InlineDispatcher(); + dispatcher.init(this.conf); + dispatcher.start(); + + FileSystem fs = FileSystem.get(this.conf); + final FileSystem spyFs = spy(FileSystem.get(this.conf)); + + LogAggregationService aggSvc = new LogAggregationService(dispatcher, + this.context, this.delSrvc, super.dirsHandler) { + @Override + protected FileSystem getFileSystem(Configuration conf) { + return spyFs; + } + }; + + aggSvc.init(this.conf); + aggSvc.start(); + + // start an application and verify user, suffix, and app dirs created + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + Path userDir = fs.makeQualified(new Path( + remoteRootLogDir.getAbsolutePath(), this.user)); + Path suffixDir = new Path(userDir, logSuffix); + Path appDir = new Path(suffixDir, appId.toString()); + aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class)); + verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class)); + + // start another application and verify only app dir created + ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2); + Path appDir2 = new Path(suffixDir, appId2.toString()); + aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class)); + + // start another application with the app dir already created and verify + // we do not try to create it again + ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3); + Path appDir3 = new Path(suffixDir, appId3.toString()); + new File(appDir3.toUri().getPath()).mkdir(); + aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); + } + @Test @SuppressWarnings("unchecked") public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {