YARN-5747. Application timeline metric aggregation in timeline v2 will lose last round aggregation when an application finishes (Li Lu via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-10-22 01:17:11 +05:30
parent 3344f0ace5
commit 1d7fc52578
1 changed files with 11 additions and 3 deletions

View File

@ -58,6 +58,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
private final ApplicationId appId; private final ApplicationId appId;
private final TimelineCollectorContext context; private final TimelineCollectorContext context;
private ScheduledThreadPoolExecutor appAggregationExecutor; private ScheduledThreadPoolExecutor appAggregationExecutor;
private AppLevelAggregator appAggregator;
public AppLevelTimelineCollector(ApplicationId appId) { public AppLevelTimelineCollector(ApplicationId appId) {
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
@ -94,7 +95,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("TimelineCollector Aggregation thread #%d") .setNameFormat("TimelineCollector Aggregation thread #%d")
.build()); .build());
appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), appAggregator = new AppLevelAggregator();
appAggregationExecutor.scheduleAtFixedRate(appAggregator,
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
TimeUnit.SECONDS); TimeUnit.SECONDS);
@ -108,6 +110,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
LOG.info("App-level aggregator shutdown timed out, shutdown now. "); LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
appAggregationExecutor.shutdownNow(); appAggregationExecutor.shutdownNow();
} }
// Perform one round of aggregation after the aggregation executor is done.
appAggregator.aggregate();
super.serviceStop(); super.serviceStop();
} }
@ -123,8 +127,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
private class AppLevelAggregator implements Runnable { private class AppLevelAggregator implements Runnable {
@Override private void aggregate() {
public void run() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("App-level real-time aggregating"); LOG.debug("App-level real-time aggregating");
} }
@ -156,6 +159,11 @@ public class AppLevelTimelineCollector extends TimelineCollector {
LOG.debug("App-level real-time aggregation complete"); LOG.debug("App-level real-time aggregation complete");
} }
} }
@Override
public void run() {
aggregate();
}
} }
} }