diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index d2762699613..e62a43695c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -58,6 +58,7 @@ public class AppLevelTimelineCollector extends TimelineCollector { private final ApplicationId appId; private final TimelineCollectorContext context; private ScheduledThreadPoolExecutor appAggregationExecutor; + private AppLevelAggregator appAggregator; public AppLevelTimelineCollector(ApplicationId appId) { super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); @@ -94,7 +95,8 @@ public class AppLevelTimelineCollector extends TimelineCollector { new ThreadFactoryBuilder() .setNameFormat("TimelineCollector Aggregation thread #%d") .build()); - appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), + appAggregator = new AppLevelAggregator(); + appAggregationExecutor.scheduleAtFixedRate(appAggregator, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, TimeUnit.SECONDS); @@ -108,6 +110,8 @@ public class AppLevelTimelineCollector extends TimelineCollector { LOG.info("App-level aggregator shutdown timed out, shutdown now. "); appAggregationExecutor.shutdownNow(); } + // Perform one round of aggregation after the aggregation executor is done. + appAggregator.aggregate(); super.serviceStop(); } @@ -123,8 +127,7 @@ public class AppLevelTimelineCollector extends TimelineCollector { private class AppLevelAggregator implements Runnable { - @Override - public void run() { + private void aggregate() { if (LOG.isDebugEnabled()) { LOG.debug("App-level real-time aggregating"); } @@ -156,6 +159,11 @@ public class AppLevelTimelineCollector extends TimelineCollector { LOG.debug("App-level real-time aggregation complete"); } } + + @Override + public void run() { + aggregate(); + } } }