diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 0b05309b432..c481dbe6724 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -26,19 +25,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.base.Preconditions; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * Service that handles writes to the timeline service and writes them to the * backing storage for a given YARN application. @@ -50,15 +40,8 @@ import java.util.concurrent.TimeUnit; public class AppLevelTimelineCollector extends TimelineCollector { private static final Log LOG = LogFactory.getLog(TimelineCollector.class); - private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1; - private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15; - private static Set entityTypesSkipAggregation - = initializeSkipSet(); - private final ApplicationId appId; private final TimelineCollectorContext context; - private ScheduledThreadPoolExecutor appAggregationExecutor; - private AppLevelAggregator appAggregator; private UserGroupInformation currentUser; public AppLevelTimelineCollector(ApplicationId appId) { @@ -68,12 +51,8 @@ public class AppLevelTimelineCollector extends TimelineCollector { context = new TimelineCollectorContext(); } - private static Set initializeSkipSet() { - Set result = new HashSet<>(); - result.add(TimelineEntityType.YARN_APPLICATION.toString()); - result.add(TimelineEntityType.YARN_FLOW_RUN.toString()); - result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); - return result; + public UserGroupInformation getCurrentUser() { + return currentUser; } @Override @@ -91,29 +70,11 @@ public class AppLevelTimelineCollector extends TimelineCollector { @Override protected void serviceStart() throws Exception { - // Launch the aggregation thread - appAggregationExecutor = new ScheduledThreadPoolExecutor( - AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS, - new ThreadFactoryBuilder() - .setNameFormat("TimelineCollector Aggregation thread #%d") - .build()); - appAggregator = new AppLevelAggregator(); - appAggregationExecutor.scheduleAtFixedRate(appAggregator, - AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, - AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, - TimeUnit.SECONDS); super.serviceStart(); } @Override protected void serviceStop() throws Exception { - appAggregationExecutor.shutdown(); - if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - LOG.info("App-level aggregator shutdown timed out, shutdown now. "); - appAggregationExecutor.shutdownNow(); - } - // Perform one round of aggregation after the aggregation executor is done. - appAggregator.aggregate(); super.serviceStop(); } @@ -122,48 +83,4 @@ public class AppLevelTimelineCollector extends TimelineCollector { return context; } - @Override - protected Set getEntityTypesSkipAggregation() { - return entityTypesSkipAggregation; - } - - private class AppLevelAggregator implements Runnable { - - private void aggregate() { - if (LOG.isDebugEnabled()) { - LOG.debug("App-level real-time aggregating"); - } - if (!isReadyToAggregate()) { - LOG.warn("App-level collector is not ready, skip aggregation. "); - return; - } - try { - TimelineCollectorContext currContext = getTimelineEntityContext(); - Map aggregationGroups - = getAggregationGroups(); - if (aggregationGroups == null - || aggregationGroups.isEmpty()) { - LOG.debug("App-level collector is empty, skip aggregation. "); - return; - } - TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( - aggregationGroups, currContext.getAppId(), - TimelineEntityType.YARN_APPLICATION.toString()); - TimelineEntities entities = new TimelineEntities(); - entities.addEntity(resultEntity); - putEntitiesAsync(entities, currentUser); - } catch (Exception e) { - LOG.error("Error aggregating timeline metrics", e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("App-level real-time aggregation complete"); - } - } - - @Override - public void run() { - aggregate(); - } - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java new file mode 100644 index 00000000000..ac91275d2eb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollectorWithAgg.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.conf.Configuration; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Service that handles aggregations for applications + * and makes use of {@link AppLevelTimelineCollector} class for + * writes to Timeline Service. + * + * App-related lifecycle management is handled by this service. + */ +@Private +@Unstable +public class AppLevelTimelineCollectorWithAgg + extends AppLevelTimelineCollector { + private static final Log LOG = LogFactory.getLog(TimelineCollector.class); + + private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1; + private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15; + private static Set entityTypesSkipAggregation + = initializeSkipSet(); + + private ScheduledThreadPoolExecutor appAggregationExecutor; + private AppLevelAggregator appAggregator; + + public AppLevelTimelineCollectorWithAgg(ApplicationId appId) { + super(appId); + } + + private static Set initializeSkipSet() { + Set result = new HashSet<>(); + result.add(TimelineEntityType.YARN_APPLICATION.toString()); + result.add(TimelineEntityType.YARN_FLOW_RUN.toString()); + result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); + return result; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + // Launch the aggregation thread + appAggregationExecutor = new ScheduledThreadPoolExecutor( + AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_NUM_THREADS, + new ThreadFactoryBuilder() + .setNameFormat("TimelineCollector Aggregation thread #%d") + .build()); + appAggregator = new AppLevelAggregator(); + appAggregationExecutor.scheduleAtFixedRate(appAggregator, + AppLevelTimelineCollectorWithAgg. + AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + AppLevelTimelineCollectorWithAgg. + AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, + TimeUnit.SECONDS); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + appAggregationExecutor.shutdown(); + if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.info("App-level aggregator shutdown timed out, shutdown now. "); + appAggregationExecutor.shutdownNow(); + } + // Perform one round of aggregation after the aggregation executor is done. + appAggregator.aggregate(); + super.serviceStop(); + } + + @Override + protected Set getEntityTypesSkipAggregation() { + return entityTypesSkipAggregation; + } + + private class AppLevelAggregator implements Runnable { + + private void aggregate() { + if (LOG.isDebugEnabled()) { + LOG.debug("App-level real-time aggregating"); + } + if (!isReadyToAggregate()) { + LOG.warn("App-level collector is not ready, skip aggregation. "); + return; + } + try { + TimelineCollectorContext currContext = getTimelineEntityContext(); + Map aggregationGroups + = getAggregationGroups(); + if (aggregationGroups == null + || aggregationGroups.isEmpty()) { + LOG.debug("App-level collector is empty, skip aggregation. "); + return; + } + TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( + aggregationGroups, currContext.getAppId(), + TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(resultEntity); + putEntitiesAsync(entities, getCurrentUser()); + } catch (Exception e) { + LOG.error("Error aggregating timeline metrics", e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("App-level real-time aggregation complete"); + } + } + + @Override + public void run() { + aggregate(); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 266bd04d2eb..93e5666d342 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -118,7 +118,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { */ public boolean addApplication(ApplicationId appId) { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); + new AppLevelTimelineCollectorWithAgg(appId); return (collectorManager.putIfAbsent(appId, collector) == collector); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java index 7bc89c582c1..a59f8c1e687 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -95,7 +95,7 @@ public class TestNMTimelineCollectorManager { Callable task = new Callable() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); + new AppLevelTimelineCollectorWithAgg(appId); return (collectorManager.putIfAbsent(appId, collector) == collector); } }; @@ -126,7 +126,7 @@ public class TestNMTimelineCollectorManager { Callable task = new Callable() { public Boolean call() { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); + new AppLevelTimelineCollectorWithAgg(appId); boolean successPut = (collectorManager.putIfAbsent(appId, collector) == collector); return successPut && collectorManager.remove(appId);