diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5e1c6fab5b9..8018d1c8df9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1679,6 +1679,12 @@ public class YarnConfiguration extends Configuration { public static final long TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60; + public static final String + TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE = + TIMELINE_SERVICE_CLIENT_PREFIX + "internal-attempt-dir-cache-size"; + public static final int + DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE = 1000; + // This is temporary solution. The configuration will be deleted once we have // the FileSystem API to check whether append operation is supported or not. public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index b471b3b59b3..35d9970ba40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -26,6 +26,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -97,6 +98,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ private long ttl; private LogFDsCache logFDsCache = null; private boolean isAppendSupported; + private final AttemptDirCache attemptDirCache; public FileSystemTimelineWriter(Configuration conf, UserGroupInformation authUgi, Client client, URI resURI) @@ -158,6 +160,15 @@ public class FileSystemTimelineWriter extends TimelineWriter{ objMapper = createObjectMapper(); + int attemptDirCacheSize = conf.getInt( + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE, + YarnConfiguration + .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE); + + attemptDirCache = + new AttemptDirCache(attemptDirCacheSize, fs, activePath); + if (LOG.isDebugEnabled()) { StringBuilder debugMSG = new StringBuilder(); debugMSG.append( @@ -199,7 +210,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ = new ArrayList(); List entitiesToEntityCache = new ArrayList(); - Path attemptDir = createAttemptDir(appAttemptId); + Path attemptDir = attemptDirCache.getAppAttemptDir(appAttemptId); for (TimelineEntity entity : entities) { if (summaryEntityTypes.contains(entity.getEntityType())) { @@ -279,32 +290,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{ return mapper; } - private Path createAttemptDir(ApplicationAttemptId appAttemptId) - throws IOException { - Path appDir = createApplicationDir(appAttemptId.getApplicationId()); - - Path attemptDir = new Path(appDir, appAttemptId.toString()); - if (!fs.exists(attemptDir)) { - FileSystem.mkdirs(fs, attemptDir, new FsPermission( - APP_LOG_DIR_PERMISSIONS)); - } - return attemptDir; - } - - private Path createApplicationDir(ApplicationId appId) throws IOException { - Path appDir = - new Path(activePath, appId.toString()); - if (!fs.exists(appDir)) { - FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS)); - } - return appDir; - } - private void writeDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain) throws IOException { Path domainLogPath = - new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX - + appAttemptId.toString()); + new Path(attemptDirCache.getAppAttemptDir(appAttemptId), + DOMAIN_LOG_PREFIX + appAttemptId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Writing domains for " + appAttemptId.toString() + " to " + domainLogPath); @@ -958,4 +948,70 @@ public class FileSystemTimelineWriter extends TimelineWriter{ } } } + + private static class AttemptDirCache { + private final int attemptDirCacheSize; + private final Map attemptDirCache; + private final FileSystem fs; + private final Path activePath; + + public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) { + this.attemptDirCacheSize = cacheSize; + this.attemptDirCache = + new LinkedHashMap( + attemptDirCacheSize, 0.75f, true) { + private static final long serialVersionUID = 1L; + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + return size() > attemptDirCacheSize; + } + }; + this.fs = fs; + this.activePath = activePath; + } + + public Path getAppAttemptDir(ApplicationAttemptId attemptId) + throws IOException { + Path attemptDir = this.attemptDirCache.get(attemptId); + if (attemptDir == null) { + synchronized(this) { + attemptDir = this.attemptDirCache.get(attemptId); + if (attemptDir == null) { + attemptDir = createAttemptDir(attemptId); + attemptDirCache.put(attemptId, attemptDir); + } + } + } + return attemptDir; + } + + private Path createAttemptDir(ApplicationAttemptId appAttemptId) + throws IOException { + Path appDir = createApplicationDir(appAttemptId.getApplicationId()); + + Path attemptDir = new Path(appDir, appAttemptId.toString()); + if (!fs.exists(attemptDir)) { + FileSystem.mkdirs(fs, attemptDir, new FsPermission( + APP_LOG_DIR_PERMISSIONS)); + if (LOG.isDebugEnabled()) { + LOG.debug("New attempt directory created - " + attemptDir); + } + } + return attemptDir; + } + + private Path createApplicationDir(ApplicationId appId) throws IOException { + Path appDir = + new Path(activePath, appId.toString()); + if (!fs.exists(appDir)) { + FileSystem.mkdirs(fs, appDir, + new FsPermission(APP_LOG_DIR_PERMISSIONS)); + if (LOG.isDebugEnabled()) { + LOG.debug("New app directory created - " + appDir); + } + } + return appDir; + } + } }