YARN-4851. Metric improvements for ATS v1.5 storage components. Li Lu via junping_du.
This commit is contained in:
parent
c1cc6ac667
commit
06413da72e
|
@ -61,7 +61,7 @@ public class EntityCacheItem {
|
|||
* Set the application logs to this cache item. The entity group should be
|
||||
* associated with this application.
|
||||
*
|
||||
* @param incomingAppLogs
|
||||
* @param incomingAppLogs Application logs this cache item mapped to
|
||||
*/
|
||||
public synchronized void setAppLogs(
|
||||
EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
|
||||
|
@ -80,18 +80,21 @@ public class EntityCacheItem {
|
|||
* rescan and then load new data. The refresh process is synchronized with
|
||||
* other operations on the same cache item.
|
||||
*
|
||||
* @param groupId
|
||||
* @param aclManager
|
||||
* @param jsonFactory
|
||||
* @param objMapper
|
||||
* @param groupId Group id of the cache
|
||||
* @param aclManager ACL manager for the timeline storage
|
||||
* @param jsonFactory JSON factory for the storage
|
||||
* @param objMapper Object mapper for the storage
|
||||
* @param metrics Metrics to trace the status of the entity group store
|
||||
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
|
||||
* object filled with all entities in the group.
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
|
||||
TimelineACLsManager aclManager, JsonFactory jsonFactory,
|
||||
ObjectMapper objMapper) throws IOException {
|
||||
ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
|
||||
throws IOException {
|
||||
if (needRefresh()) {
|
||||
long startTime = Time.monotonicNow();
|
||||
// If an application is not finished, we only update summary logs (and put
|
||||
// new entities into summary storage).
|
||||
// Otherwise, since the application is done, we can update detail logs.
|
||||
|
@ -106,9 +109,12 @@ public class EntityCacheItem {
|
|||
"LeveldbCache." + groupId);
|
||||
store.init(config);
|
||||
store.start();
|
||||
} else {
|
||||
// Store is not null, the refresh is triggered by stale storage.
|
||||
metrics.incrCacheStaleRefreshes();
|
||||
}
|
||||
List<LogInfo> removeList = new ArrayList<>();
|
||||
try(TimelineDataManager tdm =
|
||||
try (TimelineDataManager tdm =
|
||||
new TimelineDataManager(store, aclManager)) {
|
||||
tdm.init(config);
|
||||
tdm.start();
|
||||
|
@ -133,8 +139,10 @@ public class EntityCacheItem {
|
|||
appLogs.getDetailLogs().removeAll(removeList);
|
||||
}
|
||||
updateRefreshTimeToNow();
|
||||
metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
|
||||
} else {
|
||||
LOG.debug("Cache new enough, skip refreshing");
|
||||
metrics.incrNoRefreshCacheRead();
|
||||
}
|
||||
return store;
|
||||
}
|
||||
|
@ -142,7 +150,7 @@ public class EntityCacheItem {
|
|||
/**
|
||||
* Release the cache item for the given group id.
|
||||
*
|
||||
* @param groupId
|
||||
* @param groupId the group id that the cache should release
|
||||
*/
|
||||
public synchronized void releaseCache(TimelineEntityGroupId groupId) {
|
||||
try {
|
||||
|
|
|
@ -128,12 +128,17 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
private List<TimelineEntityGroupPlugin> cacheIdPlugins;
|
||||
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
|
||||
|
||||
@VisibleForTesting
|
||||
@InterfaceAudience.Private
|
||||
EntityGroupFSTimelineStoreMetrics metrics;
|
||||
|
||||
public EntityGroupFSTimelineStore() {
|
||||
super(EntityGroupFSTimelineStore.class.getSimpleName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
metrics = EntityGroupFSTimelineStoreMetrics.create();
|
||||
summaryStore = createSummaryStore();
|
||||
addService(summaryStore);
|
||||
|
||||
|
@ -171,6 +176,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
if (cacheItem.getAppLogs().isDone()) {
|
||||
appIdLogMap.remove(groupId.getApplicationId());
|
||||
}
|
||||
metrics.incrCacheEvicts();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -316,6 +322,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
int scanActiveLogs() throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
RemoteIterator<FileStatus> iter = list(activeRootPath);
|
||||
int logsToScanCount = 0;
|
||||
while (iter.hasNext()) {
|
||||
|
@ -331,6 +338,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
LOG.debug("Unable to parse entry {}", name);
|
||||
}
|
||||
}
|
||||
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
|
||||
return logsToScanCount;
|
||||
}
|
||||
|
||||
|
@ -423,6 +431,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
if (!fs.delete(dirpath, true)) {
|
||||
LOG.error("Unable to remove " + dirpath);
|
||||
}
|
||||
metrics.incrLogsDirsCleaned();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove " + dirpath, e);
|
||||
}
|
||||
|
@ -588,6 +597,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
@VisibleForTesting
|
||||
synchronized void parseSummaryLogs(TimelineDataManager tdm)
|
||||
throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
if (!isDone()) {
|
||||
LOG.debug("Try to parse summary log for log {} in {}",
|
||||
appId, appDirPath);
|
||||
|
@ -605,8 +615,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
List<LogInfo> removeList = new ArrayList<LogInfo>();
|
||||
for (LogInfo log : summaryLogs) {
|
||||
if (fs.exists(log.getPath(appDirPath))) {
|
||||
log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
|
||||
long summaryEntityParsed
|
||||
= log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
|
||||
objMapper, fs);
|
||||
metrics.incrEntitiesReadToSummary(summaryEntityParsed);
|
||||
} else {
|
||||
// The log may have been removed, remove the log
|
||||
removeList.add(log);
|
||||
|
@ -615,6 +627,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
}
|
||||
}
|
||||
summaryLogs.removeAll(removeList);
|
||||
metrics.addSummaryLogReadTime(Time.monotonicNow() - startTime);
|
||||
}
|
||||
|
||||
// scans for new logs and returns the modification timestamp of the
|
||||
|
@ -787,6 +800,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Cleaner starting");
|
||||
long startTime = Time.monotonicNow();
|
||||
try {
|
||||
cleanLogs(doneRootPath, fs, logRetainMillis);
|
||||
} catch (Exception e) {
|
||||
|
@ -796,6 +810,8 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
} else {
|
||||
LOG.error("Error cleaning files", e);
|
||||
}
|
||||
} finally {
|
||||
metrics.addLogCleanTime(Time.monotonicNow() - startTime);
|
||||
}
|
||||
LOG.debug("Cleaner finished");
|
||||
}
|
||||
|
@ -824,11 +840,13 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
if (storeForId != null) {
|
||||
LOG.debug("Adding {} as a store for the query", storeForId.getName());
|
||||
stores.add(storeForId);
|
||||
metrics.incrGetEntityToDetailOps();
|
||||
}
|
||||
}
|
||||
if (stores.size() == 0) {
|
||||
LOG.debug("Using summary store for {}", entityType);
|
||||
stores.add(this.summaryStore);
|
||||
metrics.incrGetEntityToSummaryOps();
|
||||
}
|
||||
return stores;
|
||||
}
|
||||
|
@ -898,7 +916,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
AppLogs appLogs = cacheItem.getAppLogs();
|
||||
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
|
||||
store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
|
||||
objMapper);
|
||||
objMapper, metrics);
|
||||
} else {
|
||||
LOG.warn("AppLogs for group id {} is null", groupId);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||
|
||||
/**
|
||||
* This class tracks metrics for the EntityGroupFSTimelineStore. It tracks
|
||||
* the read and write metrics for timeline server v1.5. It serves as a
|
||||
* complement to {@link TimelineDataManagerMetrics}.
|
||||
*/
|
||||
@Metrics(about="Metrics for EntityGroupFSTimelineStore", context="yarn")
|
||||
public class EntityGroupFSTimelineStoreMetrics {
|
||||
private static final String DEFAULT_VALUE_WITH_SCALE = "TimeMs";
|
||||
|
||||
// General read related metrics
|
||||
@Metric("getEntity calls to summary storage")
|
||||
private MutableCounterLong getEntityToSummaryOps;
|
||||
|
||||
@Metric("getEntity calls to detail storage")
|
||||
private MutableCounterLong getEntityToDetailOps;
|
||||
|
||||
// Summary data related metrics
|
||||
@Metric(value = "summary log read ops and time",
|
||||
valueName = DEFAULT_VALUE_WITH_SCALE)
|
||||
private MutableStat summaryLogRead;
|
||||
|
||||
@Metric("entities read into the summary storage")
|
||||
private MutableCounterLong entitiesReadToSummary;
|
||||
|
||||
// Detail data cache related metrics
|
||||
@Metric("cache storage read that does not require a refresh")
|
||||
private MutableCounterLong noRefreshCacheRead;
|
||||
|
||||
@Metric("cache storage refresh due to the cached storage is stale")
|
||||
private MutableCounterLong cacheStaleRefreshes;
|
||||
|
||||
@Metric("cache storage evicts")
|
||||
private MutableCounterLong cacheEvicts;
|
||||
|
||||
@Metric(value = "cache storage refresh ops and time",
|
||||
valueName = DEFAULT_VALUE_WITH_SCALE)
|
||||
private MutableStat cacheRefresh;
|
||||
|
||||
// Log scanner and cleaner related metrics
|
||||
@Metric(value = "active log scan ops and time",
|
||||
valueName = DEFAULT_VALUE_WITH_SCALE)
|
||||
private MutableStat activeLogDirScan;
|
||||
|
||||
@Metric(value = "log cleaner purging ops and time",
|
||||
valueName = DEFAULT_VALUE_WITH_SCALE)
|
||||
private MutableStat logClean;
|
||||
|
||||
@Metric("log cleaner dirs purged")
|
||||
private MutableCounterLong logsDirsCleaned;
|
||||
|
||||
private static EntityGroupFSTimelineStoreMetrics instance = null;
|
||||
|
||||
EntityGroupFSTimelineStoreMetrics() {
|
||||
}
|
||||
|
||||
public static synchronized EntityGroupFSTimelineStoreMetrics create() {
|
||||
if (instance == null) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
instance = ms.register(new EntityGroupFSTimelineStoreMetrics());
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
// Setters
|
||||
// General read related
|
||||
public void incrGetEntityToSummaryOps() {
|
||||
getEntityToSummaryOps.incr();
|
||||
}
|
||||
|
||||
public void incrGetEntityToDetailOps() {
|
||||
getEntityToDetailOps.incr();
|
||||
}
|
||||
|
||||
// Summary data related
|
||||
public void addSummaryLogReadTime(long msec) {
|
||||
summaryLogRead.add(msec);
|
||||
}
|
||||
|
||||
public void incrEntitiesReadToSummary(long delta) {
|
||||
entitiesReadToSummary.incr(delta);
|
||||
}
|
||||
|
||||
// Cache related
|
||||
public void incrNoRefreshCacheRead() {
|
||||
noRefreshCacheRead.incr();
|
||||
}
|
||||
|
||||
public void incrCacheStaleRefreshes() {
|
||||
cacheStaleRefreshes.incr();
|
||||
}
|
||||
|
||||
public void incrCacheEvicts() {
|
||||
cacheEvicts.incr();
|
||||
}
|
||||
|
||||
public void addCacheRefreshTime(long msec) {
|
||||
cacheRefresh.add(msec);
|
||||
}
|
||||
|
||||
// Log scanner and cleaner related
|
||||
public void addActiveLogDirScanTime(long msec) {
|
||||
activeLogDirScan.add(msec);
|
||||
}
|
||||
|
||||
public void addLogCleanTime(long msec) {
|
||||
logClean.add(msec);
|
||||
}
|
||||
|
||||
public void incrLogsDirsCleaned() {
|
||||
logsDirsCleaned.incr();
|
||||
}
|
||||
|
||||
// Getters
|
||||
MutableCounterLong getEntitiesReadToSummary() {
|
||||
return entitiesReadToSummary;
|
||||
}
|
||||
|
||||
MutableCounterLong getLogsDirsCleaned() {
|
||||
return logsDirsCleaned;
|
||||
}
|
||||
|
||||
MutableCounterLong getGetEntityToSummaryOps() {
|
||||
return getEntityToSummaryOps;
|
||||
}
|
||||
|
||||
MutableCounterLong getGetEntityToDetailOps() {
|
||||
return getEntityToDetailOps;
|
||||
}
|
||||
|
||||
MutableStat getCacheRefresh() {
|
||||
return cacheRefresh;
|
||||
}
|
||||
}
|
||||
|
|
@ -98,13 +98,14 @@ abstract class LogInfo {
|
|||
));
|
||||
}
|
||||
|
||||
public void parseForStore(TimelineDataManager tdm, Path appDirPath,
|
||||
public long parseForStore(TimelineDataManager tdm, Path appDirPath,
|
||||
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
|
||||
FileSystem fs) throws IOException {
|
||||
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
|
||||
attemptDirName);
|
||||
Path logPath = getPath(appDirPath);
|
||||
FileStatus status = fs.getFileStatus(logPath);
|
||||
long numParsed = 0;
|
||||
if (status != null) {
|
||||
long startTime = Time.monotonicNow();
|
||||
try {
|
||||
|
@ -113,6 +114,7 @@ abstract class LogInfo {
|
|||
objMapper, fs);
|
||||
LOG.info("Parsed {} entities from {} in {} msec",
|
||||
count, logPath, Time.monotonicNow() - startTime);
|
||||
numParsed += count;
|
||||
} catch (RuntimeException e) {
|
||||
// If AppLogs cannot parse this log, it may be corrupted or just empty
|
||||
if (e.getCause() instanceof JsonParseException &&
|
||||
|
@ -125,6 +127,7 @@ abstract class LogInfo {
|
|||
} else {
|
||||
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
|
||||
}
|
||||
return numParsed;
|
||||
}
|
||||
|
||||
private long parsePath(TimelineDataManager tdm, Path logPath,
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -177,12 +179,15 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
@Test
|
||||
public void testParseSummaryLogs() throws Exception {
|
||||
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
|
||||
MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
|
||||
long beforeScan = scanned.value();
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
|
||||
AppState.COMPLETED);
|
||||
appLogs.scanForLogs();
|
||||
appLogs.parseSummaryLogs(tdm);
|
||||
PluginStoreTestUtils.verifyTestEntities(tdm);
|
||||
assertEquals(beforeScan + 2L, scanned.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -227,6 +232,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
fs.mkdirs(dirPathEmpty);
|
||||
|
||||
// Should retain all logs after this run
|
||||
MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned();
|
||||
long before = dirsCleaned.value();
|
||||
store.cleanLogs(testDoneDirPath, fs, 10000);
|
||||
assertTrue(fs.exists(irrelevantDirPath));
|
||||
assertTrue(fs.exists(irrelevantFilePath));
|
||||
|
@ -256,6 +263,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
// appDirClean and appDirEmpty should be cleaned up
|
||||
assertFalse(fs.exists(appDirClean));
|
||||
assertFalse(fs.exists(appDirEmpty));
|
||||
assertEquals(before + 2L, dirsCleaned.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -272,6 +280,12 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
cacheItem.setAppLogs(appLogs);
|
||||
store.setCachedLogs(
|
||||
EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
|
||||
MutableCounterLong detailLogEntityRead =
|
||||
store.metrics.getGetEntityToDetailOps();
|
||||
MutableStat cacheRefresh = store.metrics.getCacheRefresh();
|
||||
long numEntityReadBefore = detailLogEntityRead.value();
|
||||
long cacheRefreshBefore = cacheRefresh.lastStat().numSamples();
|
||||
|
||||
// Generate TDM
|
||||
TimelineDataManager tdm
|
||||
= PluginStoreTestUtils.getTdmWithStore(config, store);
|
||||
|
@ -290,6 +304,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
assertEquals(entityNew.getStartTime(), entity.getStartTime());
|
||||
}
|
||||
// Verify metrics
|
||||
assertEquals(numEntityReadBefore + 2L, detailLogEntityRead.value());
|
||||
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -298,6 +315,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
|
||||
AppState.COMPLETED);
|
||||
MutableCounterLong summaryLogEntityRead
|
||||
= store.metrics.getGetEntityToSummaryOps();
|
||||
long numEntityReadBefore = summaryLogEntityRead.value();
|
||||
TimelineDataManager tdm
|
||||
= PluginStoreTestUtils.getTdmWithStore(config, store);
|
||||
appLogs.scanForLogs();
|
||||
|
@ -313,6 +333,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
assertEquals((Long) 123l, entity.getStartTime());
|
||||
}
|
||||
// Verify metrics
|
||||
assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue