YARN-6888. Refactor AppLevelTimelineCollector such that RM does not have aggregator threads created. Contributed by Vrushali C.
(cherry picked from commit 20dd6d18b7787e67ef96f3b6b92ea8415a8650fd)
This commit is contained in:
parent
4014390ef4
commit
4bea8af723
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -26,19 +25,10 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Service that handles writes to the timeline service and writes them to the
|
||||
* backing storage for a given YARN application.
|
||||
|
@ -50,15 +40,8 @@ import java.util.concurrent.TimeUnit;
|
|||
public class AppLevelTimelineCollector extends TimelineCollector {
|
||||
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
|
||||
|
||||
private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
|
||||
private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
|
||||
private static Set<String> entityTypesSkipAggregation
|
||||
= initializeSkipSet();
|
||||
|
||||
private final ApplicationId appId;
|
||||
private final TimelineCollectorContext context;
|
||||
private ScheduledThreadPoolExecutor appAggregationExecutor;
|
||||
private AppLevelAggregator appAggregator;
|
||||
private UserGroupInformation currentUser;
|
||||
|
||||
public AppLevelTimelineCollector(ApplicationId appId) {
|
||||
|
@ -68,12 +51,8 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|||
context = new TimelineCollectorContext();
|
||||
}
|
||||
|
||||
private static Set<String> initializeSkipSet() {
|
||||
Set<String> result = new HashSet<>();
|
||||
result.add(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
|
||||
result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
|
||||
return result;
|
||||
public UserGroupInformation getCurrentUser() {
|
||||
return currentUser;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,29 +70,11 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
// Launch the aggregation thread
|
||||
appAggregationExecutor = new ScheduledThreadPoolExecutor(
|
||||
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_NUM_THREADS,
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("TimelineCollector Aggregation thread #%d")
|
||||
.build());
|
||||
appAggregator = new AppLevelAggregator();
|
||||
appAggregationExecutor.scheduleAtFixedRate(appAggregator,
|
||||
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
|
||||
AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
|
||||
TimeUnit.SECONDS);
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
appAggregationExecutor.shutdown();
|
||||
if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
|
||||
appAggregationExecutor.shutdownNow();
|
||||
}
|
||||
// Perform one round of aggregation after the aggregation executor is done.
|
||||
appAggregator.aggregate();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
@ -122,48 +83,4 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
|||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Set<String> getEntityTypesSkipAggregation() {
|
||||
return entityTypesSkipAggregation;
|
||||
}
|
||||
|
||||
private class AppLevelAggregator implements Runnable {
|
||||
|
||||
private void aggregate() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("App-level real-time aggregating");
|
||||
}
|
||||
if (!isReadyToAggregate()) {
|
||||
LOG.warn("App-level collector is not ready, skip aggregation. ");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
TimelineCollectorContext currContext = getTimelineEntityContext();
|
||||
Map<String, AggregationStatusTable> aggregationGroups
|
||||
= getAggregationGroups();
|
||||
if (aggregationGroups == null
|
||||
|| aggregationGroups.isEmpty()) {
|
||||
LOG.debug("App-level collector is empty, skip aggregation. ");
|
||||
return;
|
||||
}
|
||||
TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
|
||||
aggregationGroups, currContext.getAppId(),
|
||||
TimelineEntityType.YARN_APPLICATION.toString());
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(resultEntity);
|
||||
putEntitiesAsync(entities, currentUser);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error aggregating timeline metrics", e);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("App-level real-time aggregation complete");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
aggregate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Service that handles aggregations for applications
|
||||
* and makes use of {@link AppLevelTimelineCollector} class for
|
||||
* writes to Timeline Service.
|
||||
*
|
||||
* App-related lifecycle management is handled by this service.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class AppLevelTimelineCollectorWithAgg
|
||||
extends AppLevelTimelineCollector {
|
||||
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
|
||||
|
||||
private final static int AGGREGATION_EXECUTOR_NUM_THREADS = 1;
|
||||
private final static int AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS = 15;
|
||||
private static Set<String> entityTypesSkipAggregation
|
||||
= initializeSkipSet();
|
||||
|
||||
private ScheduledThreadPoolExecutor appAggregationExecutor;
|
||||
private AppLevelAggregator appAggregator;
|
||||
|
||||
public AppLevelTimelineCollectorWithAgg(ApplicationId appId) {
|
||||
super(appId);
|
||||
}
|
||||
|
||||
private static Set<String> initializeSkipSet() {
|
||||
Set<String> result = new HashSet<>();
|
||||
result.add(TimelineEntityType.YARN_APPLICATION.toString());
|
||||
result.add(TimelineEntityType.YARN_FLOW_RUN.toString());
|
||||
result.add(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
// Launch the aggregation thread
|
||||
appAggregationExecutor = new ScheduledThreadPoolExecutor(
|
||||
AppLevelTimelineCollectorWithAgg.AGGREGATION_EXECUTOR_NUM_THREADS,
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("TimelineCollector Aggregation thread #%d")
|
||||
.build());
|
||||
appAggregator = new AppLevelAggregator();
|
||||
appAggregationExecutor.scheduleAtFixedRate(appAggregator,
|
||||
AppLevelTimelineCollectorWithAgg.
|
||||
AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
|
||||
AppLevelTimelineCollectorWithAgg.
|
||||
AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS,
|
||||
TimeUnit.SECONDS);
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
appAggregationExecutor.shutdown();
|
||||
if (!appAggregationExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
LOG.info("App-level aggregator shutdown timed out, shutdown now. ");
|
||||
appAggregationExecutor.shutdownNow();
|
||||
}
|
||||
// Perform one round of aggregation after the aggregation executor is done.
|
||||
appAggregator.aggregate();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Set<String> getEntityTypesSkipAggregation() {
|
||||
return entityTypesSkipAggregation;
|
||||
}
|
||||
|
||||
private class AppLevelAggregator implements Runnable {
|
||||
|
||||
private void aggregate() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("App-level real-time aggregating");
|
||||
}
|
||||
if (!isReadyToAggregate()) {
|
||||
LOG.warn("App-level collector is not ready, skip aggregation. ");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
TimelineCollectorContext currContext = getTimelineEntityContext();
|
||||
Map<String, AggregationStatusTable> aggregationGroups
|
||||
= getAggregationGroups();
|
||||
if (aggregationGroups == null
|
||||
|| aggregationGroups.isEmpty()) {
|
||||
LOG.debug("App-level collector is empty, skip aggregation. ");
|
||||
return;
|
||||
}
|
||||
TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId(
|
||||
aggregationGroups, currContext.getAppId(),
|
||||
TimelineEntityType.YARN_APPLICATION.toString());
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(resultEntity);
|
||||
putEntitiesAsync(entities, getCurrentUser());
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error aggregating timeline metrics", e);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("App-level real-time aggregation complete");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
aggregate();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -118,7 +118,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|||
*/
|
||||
public boolean addApplication(ApplicationId appId) {
|
||||
AppLevelTimelineCollector collector =
|
||||
new AppLevelTimelineCollector(appId);
|
||||
new AppLevelTimelineCollectorWithAgg(appId);
|
||||
return (collectorManager.putIfAbsent(appId, collector)
|
||||
== collector);
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ public class TestNMTimelineCollectorManager {
|
|||
Callable<Boolean> task = new Callable<Boolean>() {
|
||||
public Boolean call() {
|
||||
AppLevelTimelineCollector collector =
|
||||
new AppLevelTimelineCollector(appId);
|
||||
new AppLevelTimelineCollectorWithAgg(appId);
|
||||
return (collectorManager.putIfAbsent(appId, collector) == collector);
|
||||
}
|
||||
};
|
||||
|
@ -126,7 +126,7 @@ public class TestNMTimelineCollectorManager {
|
|||
Callable<Boolean> task = new Callable<Boolean>() {
|
||||
public Boolean call() {
|
||||
AppLevelTimelineCollector collector =
|
||||
new AppLevelTimelineCollector(appId);
|
||||
new AppLevelTimelineCollectorWithAgg(appId);
|
||||
boolean successPut =
|
||||
(collectorManager.putIfAbsent(appId, collector) == collector);
|
||||
return successPut && collectorManager.remove(appId);
|
||||
|
|
Loading…
Reference in New Issue