YARN-4851. Metric improvements for ATS v1.5 storage components. Li Lu via junping_du.

(cherry picked from commit 06413da72e)
This commit is contained in:
Junping Du 2016-05-03 04:16:21 -07:00
parent a50dc05a60
commit 47f67ae447
5 changed files with 222 additions and 11 deletions

View File

@ -61,7 +61,7 @@ public class EntityCacheItem {
* Set the application logs to this cache item. The entity group should be * Set the application logs to this cache item. The entity group should be
* associated with this application. * associated with this application.
* *
* @param incomingAppLogs * @param incomingAppLogs Application logs this cache item mapped to
*/ */
public synchronized void setAppLogs( public synchronized void setAppLogs(
EntityGroupFSTimelineStore.AppLogs incomingAppLogs) { EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
@ -80,18 +80,21 @@ public class EntityCacheItem {
* rescan and then load new data. The refresh process is synchronized with * rescan and then load new data. The refresh process is synchronized with
* other operations on the same cache item. * other operations on the same cache item.
* *
* @param groupId * @param groupId Group id of the cache
* @param aclManager * @param aclManager ACL manager for the timeline storage
* @param jsonFactory * @param jsonFactory JSON factory for the storage
* @param objMapper * @param objMapper Object mapper for the storage
* @param metrics Metrics to trace the status of the entity group store
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore} * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
* object filled with all entities in the group. * object filled with all entities in the group.
* @throws IOException * @throws IOException
*/ */
public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
TimelineACLsManager aclManager, JsonFactory jsonFactory, TimelineACLsManager aclManager, JsonFactory jsonFactory,
ObjectMapper objMapper) throws IOException { ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
throws IOException {
if (needRefresh()) { if (needRefresh()) {
long startTime = Time.monotonicNow();
// If an application is not finished, we only update summary logs (and put // If an application is not finished, we only update summary logs (and put
// new entities into summary storage). // new entities into summary storage).
// Otherwise, since the application is done, we can update detail logs. // Otherwise, since the application is done, we can update detail logs.
@ -106,6 +109,9 @@ public class EntityCacheItem {
"LeveldbCache." + groupId); "LeveldbCache." + groupId);
store.init(config); store.init(config);
store.start(); store.start();
} else {
// Store is not null, the refresh is triggered by stale storage.
metrics.incrCacheStaleRefreshes();
} }
List<LogInfo> removeList = new ArrayList<>(); List<LogInfo> removeList = new ArrayList<>();
try (TimelineDataManager tdm = try (TimelineDataManager tdm =
@ -133,8 +139,10 @@ public class EntityCacheItem {
appLogs.getDetailLogs().removeAll(removeList); appLogs.getDetailLogs().removeAll(removeList);
} }
updateRefreshTimeToNow(); updateRefreshTimeToNow();
metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
} else { } else {
LOG.debug("Cache new enough, skip refreshing"); LOG.debug("Cache new enough, skip refreshing");
metrics.incrNoRefreshCacheRead();
} }
return store; return store;
} }
@ -142,7 +150,7 @@ public class EntityCacheItem {
/** /**
* Release the cache item for the given group id. * Release the cache item for the given group id.
* *
* @param groupId * @param groupId the group id that the cache should release
*/ */
public synchronized void releaseCache(TimelineEntityGroupId groupId) { public synchronized void releaseCache(TimelineEntityGroupId groupId) {
try { try {

View File

@ -128,12 +128,17 @@ public class EntityGroupFSTimelineStore extends CompositeService
private List<TimelineEntityGroupPlugin> cacheIdPlugins; private List<TimelineEntityGroupPlugin> cacheIdPlugins;
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs; private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
@VisibleForTesting
@InterfaceAudience.Private
EntityGroupFSTimelineStoreMetrics metrics;
public EntityGroupFSTimelineStore() { public EntityGroupFSTimelineStore() {
super(EntityGroupFSTimelineStore.class.getSimpleName()); super(EntityGroupFSTimelineStore.class.getSimpleName());
} }
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
metrics = EntityGroupFSTimelineStoreMetrics.create();
summaryStore = createSummaryStore(); summaryStore = createSummaryStore();
addService(summaryStore); addService(summaryStore);
@ -171,6 +176,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
if (cacheItem.getAppLogs().isDone()) { if (cacheItem.getAppLogs().isDone()) {
appIdLogMap.remove(groupId.getApplicationId()); appIdLogMap.remove(groupId.getApplicationId());
} }
metrics.incrCacheEvicts();
return true; return true;
} }
return false; return false;
@ -316,6 +322,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
int scanActiveLogs() throws IOException { int scanActiveLogs() throws IOException {
long startTime = Time.monotonicNow();
RemoteIterator<FileStatus> iter = list(activeRootPath); RemoteIterator<FileStatus> iter = list(activeRootPath);
int logsToScanCount = 0; int logsToScanCount = 0;
while (iter.hasNext()) { while (iter.hasNext()) {
@ -331,6 +338,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
LOG.debug("Unable to parse entry {}", name); LOG.debug("Unable to parse entry {}", name);
} }
} }
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
return logsToScanCount; return logsToScanCount;
} }
@ -423,6 +431,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
if (!fs.delete(dirpath, true)) { if (!fs.delete(dirpath, true)) {
LOG.error("Unable to remove " + dirpath); LOG.error("Unable to remove " + dirpath);
} }
metrics.incrLogsDirsCleaned();
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to remove " + dirpath, e); LOG.error("Unable to remove " + dirpath, e);
} }
@ -588,6 +597,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
@VisibleForTesting @VisibleForTesting
synchronized void parseSummaryLogs(TimelineDataManager tdm) synchronized void parseSummaryLogs(TimelineDataManager tdm)
throws IOException { throws IOException {
long startTime = Time.monotonicNow();
if (!isDone()) { if (!isDone()) {
LOG.debug("Try to parse summary log for log {} in {}", LOG.debug("Try to parse summary log for log {} in {}",
appId, appDirPath); appId, appDirPath);
@ -605,8 +615,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
List<LogInfo> removeList = new ArrayList<LogInfo>(); List<LogInfo> removeList = new ArrayList<LogInfo>();
for (LogInfo log : summaryLogs) { for (LogInfo log : summaryLogs) {
if (fs.exists(log.getPath(appDirPath))) { if (fs.exists(log.getPath(appDirPath))) {
log.parseForStore(tdm, appDirPath, isDone(), jsonFactory, long summaryEntityParsed
= log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
objMapper, fs); objMapper, fs);
metrics.incrEntitiesReadToSummary(summaryEntityParsed);
} else { } else {
// The log may have been removed, remove the log // The log may have been removed, remove the log
removeList.add(log); removeList.add(log);
@ -615,6 +627,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
} }
} }
summaryLogs.removeAll(removeList); summaryLogs.removeAll(removeList);
metrics.addSummaryLogReadTime(Time.monotonicNow() - startTime);
} }
// scans for new logs and returns the modification timestamp of the // scans for new logs and returns the modification timestamp of the
@ -787,6 +800,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
@Override @Override
public void run() { public void run() {
LOG.debug("Cleaner starting"); LOG.debug("Cleaner starting");
long startTime = Time.monotonicNow();
try { try {
cleanLogs(doneRootPath, fs, logRetainMillis); cleanLogs(doneRootPath, fs, logRetainMillis);
} catch (Exception e) { } catch (Exception e) {
@ -796,6 +810,8 @@ public class EntityGroupFSTimelineStore extends CompositeService
} else { } else {
LOG.error("Error cleaning files", e); LOG.error("Error cleaning files", e);
} }
} finally {
metrics.addLogCleanTime(Time.monotonicNow() - startTime);
} }
LOG.debug("Cleaner finished"); LOG.debug("Cleaner finished");
} }
@ -824,11 +840,13 @@ public class EntityGroupFSTimelineStore extends CompositeService
if (storeForId != null) { if (storeForId != null) {
LOG.debug("Adding {} as a store for the query", storeForId.getName()); LOG.debug("Adding {} as a store for the query", storeForId.getName());
stores.add(storeForId); stores.add(storeForId);
metrics.incrGetEntityToDetailOps();
} }
} }
if (stores.size() == 0) { if (stores.size() == 0) {
LOG.debug("Using summary store for {}", entityType); LOG.debug("Using summary store for {}", entityType);
stores.add(this.summaryStore); stores.add(this.summaryStore);
metrics.incrGetEntityToSummaryOps();
} }
return stores; return stores;
} }
@ -898,7 +916,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
AppLogs appLogs = cacheItem.getAppLogs(); AppLogs appLogs = cacheItem.getAppLogs();
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
store = cacheItem.refreshCache(groupId, aclManager, jsonFactory, store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
objMapper); objMapper, metrics);
} else { } else {
LOG.warn("AppLogs for group id {} is null", groupId); LOG.warn("AppLogs for group id {} is null", groupId);
} }

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableStat;
/**
* This class tracks metrics for the EntityGroupFSTimelineStore. It tracks
* the read and write metrics for timeline server v1.5. It serves as a
* complement to {@link TimelineDataManagerMetrics}.
*/
@Metrics(about="Metrics for EntityGroupFSTimelineStore", context="yarn")
public class EntityGroupFSTimelineStoreMetrics {
private static final String DEFAULT_VALUE_WITH_SCALE = "TimeMs";
// General read related metrics
@Metric("getEntity calls to summary storage")
private MutableCounterLong getEntityToSummaryOps;
@Metric("getEntity calls to detail storage")
private MutableCounterLong getEntityToDetailOps;
// Summary data related metrics
@Metric(value = "summary log read ops and time",
valueName = DEFAULT_VALUE_WITH_SCALE)
private MutableStat summaryLogRead;
@Metric("entities read into the summary storage")
private MutableCounterLong entitiesReadToSummary;
// Detail data cache related metrics
@Metric("cache storage read that does not require a refresh")
private MutableCounterLong noRefreshCacheRead;
@Metric("cache storage refresh due to the cached storage is stale")
private MutableCounterLong cacheStaleRefreshes;
@Metric("cache storage evicts")
private MutableCounterLong cacheEvicts;
@Metric(value = "cache storage refresh ops and time",
valueName = DEFAULT_VALUE_WITH_SCALE)
private MutableStat cacheRefresh;
// Log scanner and cleaner related metrics
@Metric(value = "active log scan ops and time",
valueName = DEFAULT_VALUE_WITH_SCALE)
private MutableStat activeLogDirScan;
@Metric(value = "log cleaner purging ops and time",
valueName = DEFAULT_VALUE_WITH_SCALE)
private MutableStat logClean;
@Metric("log cleaner dirs purged")
private MutableCounterLong logsDirsCleaned;
private static EntityGroupFSTimelineStoreMetrics instance = null;
EntityGroupFSTimelineStoreMetrics() {
}
public static synchronized EntityGroupFSTimelineStoreMetrics create() {
if (instance == null) {
MetricsSystem ms = DefaultMetricsSystem.instance();
instance = ms.register(new EntityGroupFSTimelineStoreMetrics());
}
return instance;
}
// Setters
// General read related
public void incrGetEntityToSummaryOps() {
getEntityToSummaryOps.incr();
}
public void incrGetEntityToDetailOps() {
getEntityToDetailOps.incr();
}
// Summary data related
public void addSummaryLogReadTime(long msec) {
summaryLogRead.add(msec);
}
public void incrEntitiesReadToSummary(long delta) {
entitiesReadToSummary.incr(delta);
}
// Cache related
public void incrNoRefreshCacheRead() {
noRefreshCacheRead.incr();
}
public void incrCacheStaleRefreshes() {
cacheStaleRefreshes.incr();
}
public void incrCacheEvicts() {
cacheEvicts.incr();
}
public void addCacheRefreshTime(long msec) {
cacheRefresh.add(msec);
}
// Log scanner and cleaner related
public void addActiveLogDirScanTime(long msec) {
activeLogDirScan.add(msec);
}
public void addLogCleanTime(long msec) {
logClean.add(msec);
}
public void incrLogsDirsCleaned() {
logsDirsCleaned.incr();
}
// Getters
MutableCounterLong getEntitiesReadToSummary() {
return entitiesReadToSummary;
}
MutableCounterLong getLogsDirsCleaned() {
return logsDirsCleaned;
}
MutableCounterLong getGetEntityToSummaryOps() {
return getEntityToSummaryOps;
}
MutableCounterLong getGetEntityToDetailOps() {
return getEntityToDetailOps;
}
MutableStat getCacheRefresh() {
return cacheRefresh;
}
}

View File

@ -98,13 +98,14 @@ abstract class LogInfo {
)); ));
} }
public void parseForStore(TimelineDataManager tdm, Path appDirPath, public long parseForStore(TimelineDataManager tdm, Path appDirPath,
boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper, boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
FileSystem fs) throws IOException { FileSystem fs) throws IOException {
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath, LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
attemptDirName); attemptDirName);
Path logPath = getPath(appDirPath); Path logPath = getPath(appDirPath);
FileStatus status = fs.getFileStatus(logPath); FileStatus status = fs.getFileStatus(logPath);
long numParsed = 0;
if (status != null) { if (status != null) {
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
try { try {
@ -113,6 +114,7 @@ abstract class LogInfo {
objMapper, fs); objMapper, fs);
LOG.info("Parsed {} entities from {} in {} msec", LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime); count, logPath, Time.monotonicNow() - startTime);
numParsed += count;
} catch (RuntimeException e) { } catch (RuntimeException e) {
// If AppLogs cannot parse this log, it may be corrupted or just empty // If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException && if (e.getCause() instanceof JsonParseException &&
@ -125,6 +127,7 @@ abstract class LogInfo {
} else { } else {
LOG.warn("{} no longer exists. Skip for scanning. ", logPath); LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
} }
return numParsed;
} }
private long parsePath(TimelineDataManager tdm, Path logPath, private long parsePath(TimelineDataManager tdm, Path logPath,

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -177,12 +179,15 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
@Test @Test
public void testParseSummaryLogs() throws Exception { public void testParseSummaryLogs() throws Exception {
TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
long beforeScan = scanned.value();
EntityGroupFSTimelineStore.AppLogs appLogs = EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
AppState.COMPLETED); AppState.COMPLETED);
appLogs.scanForLogs(); appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm); appLogs.parseSummaryLogs(tdm);
PluginStoreTestUtils.verifyTestEntities(tdm); PluginStoreTestUtils.verifyTestEntities(tdm);
assertEquals(beforeScan + 2L, scanned.value());
} }
@Test @Test
@ -227,6 +232,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
fs.mkdirs(dirPathEmpty); fs.mkdirs(dirPathEmpty);
// Should retain all logs after this run // Should retain all logs after this run
MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned();
long before = dirsCleaned.value();
store.cleanLogs(testDoneDirPath, fs, 10000); store.cleanLogs(testDoneDirPath, fs, 10000);
assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath)); assertTrue(fs.exists(irrelevantFilePath));
@ -256,6 +263,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
// appDirClean and appDirEmpty should be cleaned up // appDirClean and appDirEmpty should be cleaned up
assertFalse(fs.exists(appDirClean)); assertFalse(fs.exists(appDirClean));
assertFalse(fs.exists(appDirEmpty)); assertFalse(fs.exists(appDirEmpty));
assertEquals(before + 2L, dirsCleaned.value());
} }
@Test @Test
@ -272,6 +280,12 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
cacheItem.setAppLogs(appLogs); cacheItem.setAppLogs(appLogs);
store.setCachedLogs( store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem); EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
MutableCounterLong detailLogEntityRead =
store.metrics.getGetEntityToDetailOps();
MutableStat cacheRefresh = store.metrics.getCacheRefresh();
long numEntityReadBefore = detailLogEntityRead.value();
long cacheRefreshBefore = cacheRefresh.lastStat().numSamples();
// Generate TDM // Generate TDM
TimelineDataManager tdm TimelineDataManager tdm
= PluginStoreTestUtils.getTdmWithStore(config, store); = PluginStoreTestUtils.getTdmWithStore(config, store);
@ -290,6 +304,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
for (TimelineEntity entity : entities.getEntities()) { for (TimelineEntity entity : entities.getEntities()) {
assertEquals(entityNew.getStartTime(), entity.getStartTime()); assertEquals(entityNew.getStartTime(), entity.getStartTime());
} }
// Verify metrics
assertEquals(numEntityReadBefore + 2L, detailLogEntityRead.value());
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
} }
@Test @Test
@ -298,6 +315,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
EntityGroupFSTimelineStore.AppLogs appLogs = EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
AppState.COMPLETED); AppState.COMPLETED);
MutableCounterLong summaryLogEntityRead
= store.metrics.getGetEntityToSummaryOps();
long numEntityReadBefore = summaryLogEntityRead.value();
TimelineDataManager tdm TimelineDataManager tdm
= PluginStoreTestUtils.getTdmWithStore(config, store); = PluginStoreTestUtils.getTdmWithStore(config, store);
appLogs.scanForLogs(); appLogs.scanForLogs();
@ -313,6 +333,8 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
for (TimelineEntity entity : entities.getEntities()) { for (TimelineEntity entity : entities.getEntities()) {
assertEquals((Long) 123l, entity.getStartTime()); assertEquals((Long) 123l, entity.getStartTime());
} }
// Verify metrics
assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
} }