From b22f4db3a0f44476b0a44b1b81f2d1dffcdf57fc Mon Sep 17 00:00:00 2001 From: Li Lu Date: Wed, 24 Feb 2016 13:43:09 -0800 Subject: [PATCH] YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9) (cherry picked from commit 9e0f7b8b69ead629f999aa86c8fb7eb581e175d8) --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../api/impl/FileSystemTimelineWriter.java | 159 +++++++++++++++--- .../src/main/resources/yarn-default.xml | 39 +++++ 4 files changed, 180 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 904cbcd25df..00f7aaa0359 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -647,6 +647,8 @@ Release 2.8.0 - UNRELEASED BUG FIXES + YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9) + YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena via devaraj) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f8443919980..52aa953af54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1673,6 +1673,12 @@ public class YarnConfiguration extends Configuration { public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT = 5*60; + public static final String + TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS = + TIMELINE_SERVICE_CLIENT_PREFIX + "internal-timers-ttl-secs"; + public static final long + TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index 1c295e17e39..aa1f1f8f1ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -32,6 +32,9 @@ import java.util.Timer; import java.util.TimerTask; import java.util.Map.Entry; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; @@ -154,8 +158,14 @@ public class FileSystemTimelineWriter extends TimelineWriter{ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS, YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT); + long timerTaskTTL = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS, + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT); + logFDsCache = - new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl); + new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl, + timerTaskTTL); this.isAppendSupported = conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); @@ -308,7 +318,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ public void writeDomain(TimelineDomain domain) throws IOException { getObjectMapper().writeValue(getJsonGenerator(), domain); - updateLastModifiedTime(System.currentTimeMillis()); + updateLastModifiedTime(Time.monotonicNow()); } } @@ -326,7 +336,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ for (TimelineEntity entity : entities) { getObjectMapper().writeValue(getJsonGenerator(), entity); } - updateLastModifiedTime(System.currentTimeMillis()); + updateLastModifiedTime(Time.monotonicNow()); } } @@ -372,7 +382,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ this.stream = createLogFileStream(fs, logPath); this.jsonGenerator = new JsonFactory().createJsonGenerator(stream); this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); - this.lastModifiedTime = System.currentTimeMillis(); + this.lastModifiedTime = Time.monotonicNow(); } protected boolean writerClosed() { @@ -386,7 +396,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ if (!isAppendSupported) { logPathToCreate = new Path(logPathToCreate.getParent(), - (logPathToCreate.getName() + "_" + System.currentTimeMillis())); + (logPathToCreate.getName() + "_" + Time.monotonicNow())); } if (!fileSystem.exists(logPathToCreate)) { streamToCreate = fileSystem.create(logPathToCreate, false); @@ -424,10 +434,9 @@ public class FileSystemTimelineWriter extends TimelineWriter{ private Map summanyLogFDs; private Map> entityLogFDs; - private Timer flushTimer; - private FlushTimerTask flushTimerTask; - private Timer cleanInActiveFDsTimer; - private CleanInActiveFDsTask cleanInActiveFDsTask; + private Timer flushTimer = null; + private Timer cleanInActiveFDsTimer = null; + private Timer monitorTaskTimer = null; private final long ttl; private final ReentrantLock domainFDLocker = new ReentrantLock(); private final ReentrantLock summaryTableLocker = new ReentrantLock(); @@ -435,27 +444,40 @@ public class FileSystemTimelineWriter extends TimelineWriter{ private final ReentrantLock summaryTableCopyLocker = new ReentrantLock(); private final ReentrantLock entityTableCopyLocker = new ReentrantLock(); private volatile boolean serviceStopped = false; + private volatile boolean timerTaskStarted = false; + private final ReentrantLock timerTaskLocker = new ReentrantLock(); + private final long flushIntervalSecs; + private final long cleanIntervalSecs; + private final long timerTaskRetainTTL; + private volatile long timeStampOfLastWrite = Time.monotonicNow(); + private final ReadLock timerTasksMonitorReadLock; + private final WriteLock timerTasksMonitorWriteLock; public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, - long ttl) { + long ttl, long timerTaskRetainTTL) { domainLogFD = null; summanyLogFDs = new HashMap(); entityLogFDs = new HashMap>(); - this.flushTimer = - new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", - true); - this.flushTimerTask = new FlushTimerTask(); - this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000, - flushIntervalSecs * 1000); - - this.cleanInActiveFDsTimer = - new Timer(LogFDsCache.class.getSimpleName() + - "cleanInActiveFDsTimer", true); - this.cleanInActiveFDsTask = new CleanInActiveFDsTask(); - this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask, - cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); this.ttl = ttl * 1000; + this.flushIntervalSecs = flushIntervalSecs; + this.cleanIntervalSecs = cleanIntervalSecs; + long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000; + if (timerTaskRetainTTLVar > this.ttl) { + this.timerTaskRetainTTL = timerTaskRetainTTLVar; + } else { + this.timerTaskRetainTTL = this.ttl + 2 * 60 * 1000; + LOG.warn("The specific " + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS + " : " + + timerTaskRetainTTL + " is invalid, because it is less than or " + + "equal to " + YarnConfiguration + .TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + ". Use " + + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + + ttl + " + 120s instead."); + } + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.timerTasksMonitorReadLock = lock.readLock(); + this.timerTasksMonitorWriteLock = lock.writeLock(); } @Override @@ -548,7 +570,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ } private void cleanInActiveFDs() { - long currentTimeStamp = System.currentTimeMillis(); + long currentTimeStamp = Time.monotonicNow(); try { this.domainFDLocker.lock(); if (domainLogFD != null) { @@ -623,13 +645,55 @@ public class FileSystemTimelineWriter extends TimelineWriter{ } } + private class TimerMonitorTask extends TimerTask { + @Override + public void run() { + try { + timerTasksMonitorWriteLock.lock(); + monitorTimerTasks(); + } finally { + timerTasksMonitorWriteLock.unlock(); + } + } + } + + private void monitorTimerTasks() { + if (Time.monotonicNow() - this.timeStampOfLastWrite + >= this.timerTaskRetainTTL) { + cancelAndCloseTimerTasks(); + + timerTaskStarted = false; + } else { + if (this.monitorTaskTimer != null) { + this.monitorTaskTimer.schedule(new TimerMonitorTask(), + this.timerTaskRetainTTL); + } + } + } + @Override public void close() throws IOException { serviceStopped = true; - flushTimer.cancel(); - cleanInActiveFDsTimer.cancel(); + cancelAndCloseTimerTasks(); + } + + private void cancelAndCloseTimerTasks() { + if (flushTimer != null) { + flushTimer.cancel(); + flushTimer = null; + } + + if (cleanInActiveFDsTimer != null) { + cleanInActiveFDsTimer.cancel(); + cleanInActiveFDsTimer = null; + } + + if (monitorTaskTimer != null) { + monitorTaskTimer.cancel(); + monitorTaskTimer = null; + } try { this.domainFDLocker.lock(); @@ -696,6 +760,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ public void writeDomainLog(FileSystem fs, Path logPath, ObjectMapper objMapper, TimelineDomain domain, boolean isAppendSupported) throws IOException { + checkAndStartTimeTasks(); try { this.domainFDLocker.lock(); if (this.domainLogFD != null) { @@ -714,6 +779,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ ObjectMapper objMapper, ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, List entitiesToEntity, boolean isAppendSupported) throws IOException{ + checkAndStartTimeTasks(); writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs); } @@ -788,6 +854,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{ ObjectMapper objMapper, ApplicationAttemptId attemptId, List entities, boolean isAppendSupported) throws IOException { + checkAndStartTimeTasks(); writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities, isAppendSupported, this.summanyLogFDs); } @@ -843,5 +910,45 @@ public class FileSystemTimelineWriter extends TimelineWriter{ summaryTableLocker.unlock(); } } + + private void createAndStartTimerTasks() { + this.flushTimer = + new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", + true); + this.flushTimer.schedule(new FlushTimerTask(), flushIntervalSecs * 1000, + flushIntervalSecs * 1000); + + this.cleanInActiveFDsTimer = + new Timer(LogFDsCache.class.getSimpleName() + + "cleanInActiveFDsTimer", true); + this.cleanInActiveFDsTimer.schedule(new CleanInActiveFDsTask(), + cleanIntervalSecs * 1000, cleanIntervalSecs * 1000); + + this.monitorTaskTimer = + new Timer(LogFDsCache.class.getSimpleName() + "MonitorTimer", + true); + this.monitorTaskTimer.schedule(new TimerMonitorTask(), + this.timerTaskRetainTTL); + } + + private void checkAndStartTimeTasks() { + try { + this.timerTasksMonitorReadLock.lock(); + this.timeStampOfLastWrite = Time.monotonicNow(); + if(!timerTaskStarted) { + try { + timerTaskLocker.lock(); + if (!timerTaskStarted) { + createAndStartTimerTasks(); + timerTaskStarted = true; + } + } finally { + timerTaskLocker.unlock(); + } + } + } finally { + this.timerTasksMonitorReadLock.unlock(); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1c7f078fd00..c93fb366571 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2050,6 +2050,45 @@ 10485760 + + yarn.timeline-service.client.fd-flush-interval-secs + + Flush interval for ATS v1.5 writer. This value controls how frequent + the writer will flush the HDFS FSStream for the entity/domain. + + 10 + + + + yarn.timeline-service.client.fd-clean-interval-secs + + Scan interval for ATS v1.5 writer. This value controls how frequent + the writer will scan the HDFS FSStream for the entity/domain. + If the FSStream is stale for a long time, this FSStream will be close. + + 60 + + + + yarn.timeline-service.client.fd-retain-secs + + How long the ATS v1.5 writer will keep a FSStream open. + If this fsstream does not write anything for this configured time, + it will be close. + + 300 + + + + yarn.timeline-service.client.internal-timers-ttl-secs + + How long the internal Timer Tasks can be alive in writer. If there is no + write operation for this configured time, the internal timer tasks will + be close. + + 420 + +