YARN-4814. ATS 1.5 timelineclient impl call flush after every event write. Contributed by Xuan Gong.

(cherry picked from commit af1d125f9c)
(cherry picked from commit 76602161c0)
This commit is contained in:
Junping Du 2016-03-23 08:57:16 -07:00
parent f362b8a403
commit 762c7d4361
2 changed files with 86 additions and 24 deletions

View File

@ -1679,6 +1679,12 @@ private static void addDeprecatedKeys() {
public static final long
TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
public static final String
TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE =
TIMELINE_SERVICE_CLIENT_PREFIX + "internal-attempt-dir-cache-size";
public static final int
DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE = 1000;
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND

View File

@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -97,6 +98,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private long ttl;
private LogFDsCache logFDsCache = null;
private boolean isAppendSupported;
private final AttemptDirCache attemptDirCache;
public FileSystemTimelineWriter(Configuration conf,
UserGroupInformation authUgi, Client client, URI resURI)
@ -158,6 +160,15 @@ public FileSystemTimelineWriter(Configuration conf,
objMapper = createObjectMapper();
int attemptDirCacheSize = conf.getInt(
YarnConfiguration
.TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE,
YarnConfiguration
.DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
attemptDirCache =
new AttemptDirCache(attemptDirCacheSize, fs, activePath);
if (LOG.isDebugEnabled()) {
StringBuilder debugMSG = new StringBuilder();
debugMSG.append(
@ -199,7 +210,7 @@ public TimelinePutResponse putEntities(
= new ArrayList<TimelineEntity>();
List<TimelineEntity> entitiesToEntityCache
= new ArrayList<TimelineEntity>();
Path attemptDir = createAttemptDir(appAttemptId);
Path attemptDir = attemptDirCache.getAppAttemptDir(appAttemptId);
for (TimelineEntity entity : entities) {
if (summaryEntityTypes.contains(entity.getEntityType())) {
@ -279,32 +290,11 @@ private ObjectMapper createObjectMapper() {
return mapper;
}
private Path createAttemptDir(ApplicationAttemptId appAttemptId)
throws IOException {
Path appDir = createApplicationDir(appAttemptId.getApplicationId());
Path attemptDir = new Path(appDir, appAttemptId.toString());
if (!fs.exists(attemptDir)) {
FileSystem.mkdirs(fs, attemptDir, new FsPermission(
APP_LOG_DIR_PERMISSIONS));
}
return attemptDir;
}
private Path createApplicationDir(ApplicationId appId) throws IOException {
Path appDir =
new Path(activePath, appId.toString());
if (!fs.exists(appDir)) {
FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
}
return appDir;
}
private void writeDomain(ApplicationAttemptId appAttemptId,
TimelineDomain domain) throws IOException {
Path domainLogPath =
new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
+ appAttemptId.toString());
new Path(attemptDirCache.getAppAttemptDir(appAttemptId),
DOMAIN_LOG_PREFIX + appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Writing domains for " + appAttemptId.toString() + " to "
+ domainLogPath);
@ -958,4 +948,70 @@ private void checkAndStartTimeTasks() {
}
}
}
private static class AttemptDirCache {
private final int attemptDirCacheSize;
private final Map<ApplicationAttemptId, Path> attemptDirCache;
private final FileSystem fs;
private final Path activePath;
public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
this.attemptDirCacheSize = cacheSize;
this.attemptDirCache =
new LinkedHashMap<ApplicationAttemptId, Path>(
attemptDirCacheSize, 0.75f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(
Map.Entry<ApplicationAttemptId, Path> eldest) {
return size() > attemptDirCacheSize;
}
};
this.fs = fs;
this.activePath = activePath;
}
public Path getAppAttemptDir(ApplicationAttemptId attemptId)
throws IOException {
Path attemptDir = this.attemptDirCache.get(attemptId);
if (attemptDir == null) {
synchronized(this) {
attemptDir = this.attemptDirCache.get(attemptId);
if (attemptDir == null) {
attemptDir = createAttemptDir(attemptId);
attemptDirCache.put(attemptId, attemptDir);
}
}
}
return attemptDir;
}
private Path createAttemptDir(ApplicationAttemptId appAttemptId)
throws IOException {
Path appDir = createApplicationDir(appAttemptId.getApplicationId());
Path attemptDir = new Path(appDir, appAttemptId.toString());
if (!fs.exists(attemptDir)) {
FileSystem.mkdirs(fs, attemptDir, new FsPermission(
APP_LOG_DIR_PERMISSIONS));
if (LOG.isDebugEnabled()) {
LOG.debug("New attempt directory created - " + attemptDir);
}
}
return attemptDir;
}
private Path createApplicationDir(ApplicationId appId) throws IOException {
Path appDir =
new Path(activePath, appId.toString());
if (!fs.exists(appDir)) {
FileSystem.mkdirs(fs, appDir,
new FsPermission(APP_LOG_DIR_PERMISSIONS));
if (LOG.isDebugEnabled()) {
LOG.debug("New app directory created - " + appDir);
}
}
return appDir;
}
}
}