YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via
gtcarrera9)
(cherry picked from commit 9e0f7b8b69
)
This commit is contained in:
parent
d4bbdd9d28
commit
b22f4db3a0
|
@ -647,6 +647,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9)
|
||||||
|
|
||||||
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
||||||
via devaraj)
|
via devaraj)
|
||||||
|
|
||||||
|
|
|
@ -1673,6 +1673,12 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
|
public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
|
||||||
5*60;
|
5*60;
|
||||||
|
|
||||||
|
public static final String
|
||||||
|
TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS =
|
||||||
|
TIMELINE_SERVICE_CLIENT_PREFIX + "internal-timers-ttl-secs";
|
||||||
|
public static final long
|
||||||
|
TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
|
||||||
|
|
||||||
// mark app-history related configs @Private as application history is going
|
// mark app-history related configs @Private as application history is going
|
||||||
// to be integrated into the timeline service
|
// to be integrated into the timeline service
|
||||||
@Private
|
@Private
|
||||||
|
|
|
@ -32,6 +32,9 @@ import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -44,6 +47,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
|
@ -154,8 +158,14 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
|
||||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
|
||||||
|
|
||||||
|
long timerTaskTTL = conf.getLong(
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS,
|
||||||
|
YarnConfiguration
|
||||||
|
.TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT);
|
||||||
|
|
||||||
logFDsCache =
|
logFDsCache =
|
||||||
new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
|
new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl,
|
||||||
|
timerTaskTTL);
|
||||||
|
|
||||||
this.isAppendSupported =
|
this.isAppendSupported =
|
||||||
conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
|
conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
|
||||||
|
@ -308,7 +318,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
public void writeDomain(TimelineDomain domain)
|
public void writeDomain(TimelineDomain domain)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
getObjectMapper().writeValue(getJsonGenerator(), domain);
|
getObjectMapper().writeValue(getJsonGenerator(), domain);
|
||||||
updateLastModifiedTime(System.currentTimeMillis());
|
updateLastModifiedTime(Time.monotonicNow());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,7 +336,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
for (TimelineEntity entity : entities) {
|
for (TimelineEntity entity : entities) {
|
||||||
getObjectMapper().writeValue(getJsonGenerator(), entity);
|
getObjectMapper().writeValue(getJsonGenerator(), entity);
|
||||||
}
|
}
|
||||||
updateLastModifiedTime(System.currentTimeMillis());
|
updateLastModifiedTime(Time.monotonicNow());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,7 +382,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
this.stream = createLogFileStream(fs, logPath);
|
this.stream = createLogFileStream(fs, logPath);
|
||||||
this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
|
this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
|
||||||
this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
|
||||||
this.lastModifiedTime = System.currentTimeMillis();
|
this.lastModifiedTime = Time.monotonicNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean writerClosed() {
|
protected boolean writerClosed() {
|
||||||
|
@ -386,7 +396,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
if (!isAppendSupported) {
|
if (!isAppendSupported) {
|
||||||
logPathToCreate =
|
logPathToCreate =
|
||||||
new Path(logPathToCreate.getParent(),
|
new Path(logPathToCreate.getParent(),
|
||||||
(logPathToCreate.getName() + "_" + System.currentTimeMillis()));
|
(logPathToCreate.getName() + "_" + Time.monotonicNow()));
|
||||||
}
|
}
|
||||||
if (!fileSystem.exists(logPathToCreate)) {
|
if (!fileSystem.exists(logPathToCreate)) {
|
||||||
streamToCreate = fileSystem.create(logPathToCreate, false);
|
streamToCreate = fileSystem.create(logPathToCreate, false);
|
||||||
|
@ -424,10 +434,9 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
|
private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
|
||||||
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
|
||||||
EntityLogFD>> entityLogFDs;
|
EntityLogFD>> entityLogFDs;
|
||||||
private Timer flushTimer;
|
private Timer flushTimer = null;
|
||||||
private FlushTimerTask flushTimerTask;
|
private Timer cleanInActiveFDsTimer = null;
|
||||||
private Timer cleanInActiveFDsTimer;
|
private Timer monitorTaskTimer = null;
|
||||||
private CleanInActiveFDsTask cleanInActiveFDsTask;
|
|
||||||
private final long ttl;
|
private final long ttl;
|
||||||
private final ReentrantLock domainFDLocker = new ReentrantLock();
|
private final ReentrantLock domainFDLocker = new ReentrantLock();
|
||||||
private final ReentrantLock summaryTableLocker = new ReentrantLock();
|
private final ReentrantLock summaryTableLocker = new ReentrantLock();
|
||||||
|
@ -435,27 +444,40 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
|
private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
|
||||||
private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
|
private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
|
||||||
private volatile boolean serviceStopped = false;
|
private volatile boolean serviceStopped = false;
|
||||||
|
private volatile boolean timerTaskStarted = false;
|
||||||
|
private final ReentrantLock timerTaskLocker = new ReentrantLock();
|
||||||
|
private final long flushIntervalSecs;
|
||||||
|
private final long cleanIntervalSecs;
|
||||||
|
private final long timerTaskRetainTTL;
|
||||||
|
private volatile long timeStampOfLastWrite = Time.monotonicNow();
|
||||||
|
private final ReadLock timerTasksMonitorReadLock;
|
||||||
|
private final WriteLock timerTasksMonitorWriteLock;
|
||||||
|
|
||||||
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
|
public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
|
||||||
long ttl) {
|
long ttl, long timerTaskRetainTTL) {
|
||||||
domainLogFD = null;
|
domainLogFD = null;
|
||||||
summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
|
summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
|
||||||
entityLogFDs = new HashMap<ApplicationAttemptId,
|
entityLogFDs = new HashMap<ApplicationAttemptId,
|
||||||
HashMap<TimelineEntityGroupId, EntityLogFD>>();
|
HashMap<TimelineEntityGroupId, EntityLogFD>>();
|
||||||
this.flushTimer =
|
|
||||||
new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
|
|
||||||
true);
|
|
||||||
this.flushTimerTask = new FlushTimerTask();
|
|
||||||
this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
|
|
||||||
flushIntervalSecs * 1000);
|
|
||||||
|
|
||||||
this.cleanInActiveFDsTimer =
|
|
||||||
new Timer(LogFDsCache.class.getSimpleName() +
|
|
||||||
"cleanInActiveFDsTimer", true);
|
|
||||||
this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
|
|
||||||
this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
|
|
||||||
cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
|
|
||||||
this.ttl = ttl * 1000;
|
this.ttl = ttl * 1000;
|
||||||
|
this.flushIntervalSecs = flushIntervalSecs;
|
||||||
|
this.cleanIntervalSecs = cleanIntervalSecs;
|
||||||
|
long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000;
|
||||||
|
if (timerTaskRetainTTLVar > this.ttl) {
|
||||||
|
this.timerTaskRetainTTL = timerTaskRetainTTLVar;
|
||||||
|
} else {
|
||||||
|
this.timerTaskRetainTTL = this.ttl + 2 * 60 * 1000;
|
||||||
|
LOG.warn("The specific " + YarnConfiguration
|
||||||
|
.TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS + " : "
|
||||||
|
+ timerTaskRetainTTL + " is invalid, because it is less than or "
|
||||||
|
+ "equal to " + YarnConfiguration
|
||||||
|
.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : " + ttl + ". Use "
|
||||||
|
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + " : "
|
||||||
|
+ ttl + " + 120s instead.");
|
||||||
|
}
|
||||||
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
this.timerTasksMonitorReadLock = lock.readLock();
|
||||||
|
this.timerTasksMonitorWriteLock = lock.writeLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -548,7 +570,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanInActiveFDs() {
|
private void cleanInActiveFDs() {
|
||||||
long currentTimeStamp = System.currentTimeMillis();
|
long currentTimeStamp = Time.monotonicNow();
|
||||||
try {
|
try {
|
||||||
this.domainFDLocker.lock();
|
this.domainFDLocker.lock();
|
||||||
if (domainLogFD != null) {
|
if (domainLogFD != null) {
|
||||||
|
@ -623,13 +645,55 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class TimerMonitorTask extends TimerTask {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
timerTasksMonitorWriteLock.lock();
|
||||||
|
monitorTimerTasks();
|
||||||
|
} finally {
|
||||||
|
timerTasksMonitorWriteLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void monitorTimerTasks() {
|
||||||
|
if (Time.monotonicNow() - this.timeStampOfLastWrite
|
||||||
|
>= this.timerTaskRetainTTL) {
|
||||||
|
cancelAndCloseTimerTasks();
|
||||||
|
|
||||||
|
timerTaskStarted = false;
|
||||||
|
} else {
|
||||||
|
if (this.monitorTaskTimer != null) {
|
||||||
|
this.monitorTaskTimer.schedule(new TimerMonitorTask(),
|
||||||
|
this.timerTaskRetainTTL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
|
||||||
serviceStopped = true;
|
serviceStopped = true;
|
||||||
|
|
||||||
flushTimer.cancel();
|
cancelAndCloseTimerTasks();
|
||||||
cleanInActiveFDsTimer.cancel();
|
}
|
||||||
|
|
||||||
|
private void cancelAndCloseTimerTasks() {
|
||||||
|
if (flushTimer != null) {
|
||||||
|
flushTimer.cancel();
|
||||||
|
flushTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cleanInActiveFDsTimer != null) {
|
||||||
|
cleanInActiveFDsTimer.cancel();
|
||||||
|
cleanInActiveFDsTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (monitorTaskTimer != null) {
|
||||||
|
monitorTaskTimer.cancel();
|
||||||
|
monitorTaskTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.domainFDLocker.lock();
|
this.domainFDLocker.lock();
|
||||||
|
@ -696,6 +760,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
public void writeDomainLog(FileSystem fs, Path logPath,
|
public void writeDomainLog(FileSystem fs, Path logPath,
|
||||||
ObjectMapper objMapper, TimelineDomain domain,
|
ObjectMapper objMapper, TimelineDomain domain,
|
||||||
boolean isAppendSupported) throws IOException {
|
boolean isAppendSupported) throws IOException {
|
||||||
|
checkAndStartTimeTasks();
|
||||||
try {
|
try {
|
||||||
this.domainFDLocker.lock();
|
this.domainFDLocker.lock();
|
||||||
if (this.domainLogFD != null) {
|
if (this.domainLogFD != null) {
|
||||||
|
@ -714,6 +779,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
|
ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
|
||||||
TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
|
TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
|
||||||
boolean isAppendSupported) throws IOException{
|
boolean isAppendSupported) throws IOException{
|
||||||
|
checkAndStartTimeTasks();
|
||||||
writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
|
writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
|
||||||
groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
|
groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
|
||||||
}
|
}
|
||||||
|
@ -788,6 +854,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
ObjectMapper objMapper, ApplicationAttemptId attemptId,
|
||||||
List<TimelineEntity> entities, boolean isAppendSupported)
|
List<TimelineEntity> entities, boolean isAppendSupported)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
checkAndStartTimeTasks();
|
||||||
writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
|
writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
|
||||||
isAppendSupported, this.summanyLogFDs);
|
isAppendSupported, this.summanyLogFDs);
|
||||||
}
|
}
|
||||||
|
@ -843,5 +910,45 @@ public class FileSystemTimelineWriter extends TimelineWriter{
|
||||||
summaryTableLocker.unlock();
|
summaryTableLocker.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createAndStartTimerTasks() {
|
||||||
|
this.flushTimer =
|
||||||
|
new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
|
||||||
|
true);
|
||||||
|
this.flushTimer.schedule(new FlushTimerTask(), flushIntervalSecs * 1000,
|
||||||
|
flushIntervalSecs * 1000);
|
||||||
|
|
||||||
|
this.cleanInActiveFDsTimer =
|
||||||
|
new Timer(LogFDsCache.class.getSimpleName()
|
||||||
|
+ "cleanInActiveFDsTimer", true);
|
||||||
|
this.cleanInActiveFDsTimer.schedule(new CleanInActiveFDsTask(),
|
||||||
|
cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
|
||||||
|
|
||||||
|
this.monitorTaskTimer =
|
||||||
|
new Timer(LogFDsCache.class.getSimpleName() + "MonitorTimer",
|
||||||
|
true);
|
||||||
|
this.monitorTaskTimer.schedule(new TimerMonitorTask(),
|
||||||
|
this.timerTaskRetainTTL);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkAndStartTimeTasks() {
|
||||||
|
try {
|
||||||
|
this.timerTasksMonitorReadLock.lock();
|
||||||
|
this.timeStampOfLastWrite = Time.monotonicNow();
|
||||||
|
if(!timerTaskStarted) {
|
||||||
|
try {
|
||||||
|
timerTaskLocker.lock();
|
||||||
|
if (!timerTaskStarted) {
|
||||||
|
createAndStartTimerTasks();
|
||||||
|
timerTaskStarted = true;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
timerTaskLocker.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
this.timerTasksMonitorReadLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2050,6 +2050,45 @@
|
||||||
<value>10485760</value>
|
<value>10485760</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.timeline-service.client.fd-flush-interval-secs</name>
|
||||||
|
<description>
|
||||||
|
Flush interval for ATS v1.5 writer. This value controls how frequent
|
||||||
|
the writer will flush the HDFS FSStream for the entity/domain.
|
||||||
|
</description>
|
||||||
|
<value>10</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.timeline-service.client.fd-clean-interval-secs</name>
|
||||||
|
<description>
|
||||||
|
Scan interval for ATS v1.5 writer. This value controls how frequent
|
||||||
|
the writer will scan the HDFS FSStream for the entity/domain.
|
||||||
|
If the FSStream is stale for a long time, this FSStream will be close.
|
||||||
|
</description>
|
||||||
|
<value>60</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.timeline-service.client.fd-retain-secs</name>
|
||||||
|
<description>
|
||||||
|
How long the ATS v1.5 writer will keep a FSStream open.
|
||||||
|
If this fsstream does not write anything for this configured time,
|
||||||
|
it will be close.
|
||||||
|
</description>
|
||||||
|
<value>300</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.timeline-service.client.internal-timers-ttl-secs</name>
|
||||||
|
<description>
|
||||||
|
How long the internal Timer Tasks can be alive in writer. If there is no
|
||||||
|
write operation for this configured time, the internal timer tasks will
|
||||||
|
be close.
|
||||||
|
</description>
|
||||||
|
<value>420</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!-- Shared Cache Configuration -->
|
<!-- Shared Cache Configuration -->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
Loading…
Reference in New Issue