From 06413da72efed9a50e49efaf7110c220c88a7f4a Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 3 May 2016 04:16:21 -0700 Subject: [PATCH] YARN-4851. Metric improvements for ATS v1.5 storage components. Li Lu via junping_du. --- .../yarn/server/timeline/EntityCacheItem.java | 24 ++- .../timeline/EntityGroupFSTimelineStore.java | 22 ++- .../EntityGroupFSTimelineStoreMetrics.java | 160 ++++++++++++++++++ .../hadoop/yarn/server/timeline/LogInfo.java | 5 +- .../TestEntityGroupFSTimelineStore.java | 22 +++ 5 files changed, 222 insertions(+), 11 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index 7eec7c393c6..dd2a27d43d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -61,7 +61,7 @@ public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() { * Set the application logs to this cache item. The entity group should be * associated with this application. * - * @param incomingAppLogs + * @param incomingAppLogs Application logs this cache item mapped to */ public synchronized void setAppLogs( EntityGroupFSTimelineStore.AppLogs incomingAppLogs) { @@ -80,18 +80,21 @@ public synchronized TimelineStore getStore() { * rescan and then load new data. The refresh process is synchronized with * other operations on the same cache item. * - * @param groupId - * @param aclManager - * @param jsonFactory - * @param objMapper + * @param groupId Group id of the cache + * @param aclManager ACL manager for the timeline storage + * @param jsonFactory JSON factory for the storage + * @param objMapper Object mapper for the storage + * @param metrics Metrics to trace the status of the entity group store * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore} * object filled with all entities in the group. * @throws IOException */ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, TimelineACLsManager aclManager, JsonFactory jsonFactory, - ObjectMapper objMapper) throws IOException { + ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics) + throws IOException { if (needRefresh()) { + long startTime = Time.monotonicNow(); // If an application is not finished, we only update summary logs (and put // new entities into summary storage). // Otherwise, since the application is done, we can update detail logs. @@ -106,9 +109,12 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, "LeveldbCache." + groupId); store.init(config); store.start(); + } else { + // Store is not null, the refresh is triggered by stale storage. + metrics.incrCacheStaleRefreshes(); } List removeList = new ArrayList<>(); - try(TimelineDataManager tdm = + try (TimelineDataManager tdm = new TimelineDataManager(store, aclManager)) { tdm.init(config); tdm.start(); @@ -133,8 +139,10 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, appLogs.getDetailLogs().removeAll(removeList); } updateRefreshTimeToNow(); + metrics.addCacheRefreshTime(Time.monotonicNow() - startTime); } else { LOG.debug("Cache new enough, skip refreshing"); + metrics.incrNoRefreshCacheRead(); } return store; } @@ -142,7 +150,7 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, /** * Release the cache item for the given group id. * - * @param groupId + * @param groupId the group id that the cache should release */ public synchronized void releaseCache(TimelineEntityGroupId groupId) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 18b8951aa0e..edd430c2647 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -128,12 +128,17 @@ public class EntityGroupFSTimelineStore extends CompositeService private List cacheIdPlugins; private Map cachedLogs; + @VisibleForTesting + @InterfaceAudience.Private + EntityGroupFSTimelineStoreMetrics metrics; + public EntityGroupFSTimelineStore() { super(EntityGroupFSTimelineStore.class.getSimpleName()); } @Override protected void serviceInit(Configuration conf) throws Exception { + metrics = EntityGroupFSTimelineStoreMetrics.create(); summaryStore = createSummaryStore(); addService(summaryStore); @@ -171,6 +176,7 @@ protected boolean removeEldestEntry( if (cacheItem.getAppLogs().isDone()) { appIdLogMap.remove(groupId.getApplicationId()); } + metrics.incrCacheEvicts(); return true; } return false; @@ -316,6 +322,7 @@ protected void serviceStop() throws Exception { @InterfaceAudience.Private @VisibleForTesting int scanActiveLogs() throws IOException { + long startTime = Time.monotonicNow(); RemoteIterator iter = list(activeRootPath); int logsToScanCount = 0; while (iter.hasNext()) { @@ -331,6 +338,7 @@ int scanActiveLogs() throws IOException { LOG.debug("Unable to parse entry {}", name); } } + metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime); return logsToScanCount; } @@ -423,6 +431,7 @@ void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) if (!fs.delete(dirpath, true)) { LOG.error("Unable to remove " + dirpath); } + metrics.incrLogsDirsCleaned(); } catch (IOException e) { LOG.error("Unable to remove " + dirpath, e); } @@ -588,6 +597,7 @@ public synchronized void parseSummaryLogs() throws IOException { @VisibleForTesting synchronized void parseSummaryLogs(TimelineDataManager tdm) throws IOException { + long startTime = Time.monotonicNow(); if (!isDone()) { LOG.debug("Try to parse summary log for log {} in {}", appId, appDirPath); @@ -605,8 +615,10 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) List removeList = new ArrayList(); for (LogInfo log : summaryLogs) { if (fs.exists(log.getPath(appDirPath))) { - log.parseForStore(tdm, appDirPath, isDone(), jsonFactory, + long summaryEntityParsed + = log.parseForStore(tdm, appDirPath, isDone(), jsonFactory, objMapper, fs); + metrics.incrEntitiesReadToSummary(summaryEntityParsed); } else { // The log may have been removed, remove the log removeList.add(log); @@ -615,6 +627,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) } } summaryLogs.removeAll(removeList); + metrics.addSummaryLogReadTime(Time.monotonicNow() - startTime); } // scans for new logs and returns the modification timestamp of the @@ -787,6 +800,7 @@ private class EntityLogCleaner implements Runnable { @Override public void run() { LOG.debug("Cleaner starting"); + long startTime = Time.monotonicNow(); try { cleanLogs(doneRootPath, fs, logRetainMillis); } catch (Exception e) { @@ -796,6 +810,8 @@ public void run() { } else { LOG.error("Error cleaning files", e); } + } finally { + metrics.addLogCleanTime(Time.monotonicNow() - startTime); } LOG.debug("Cleaner finished"); } @@ -824,11 +840,13 @@ private List getTimelineStoresFromCacheIds( if (storeForId != null) { LOG.debug("Adding {} as a store for the query", storeForId.getName()); stores.add(storeForId); + metrics.incrGetEntityToDetailOps(); } } if (stores.size() == 0) { LOG.debug("Using summary store for {}", entityType); stores.add(this.summaryStore); + metrics.incrGetEntityToSummaryOps(); } return stores; } @@ -898,7 +916,7 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId) AppLogs appLogs = cacheItem.getAppLogs(); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); store = cacheItem.refreshCache(groupId, aclManager, jsonFactory, - objMapper); + objMapper, metrics); } else { LOG.warn("AppLogs for group id {} is null", groupId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java new file mode 100644 index 00000000000..de2ef3ba6f2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableStat; + +/** + * This class tracks metrics for the EntityGroupFSTimelineStore. It tracks + * the read and write metrics for timeline server v1.5. It serves as a + * complement to {@link TimelineDataManagerMetrics}. + */ +@Metrics(about="Metrics for EntityGroupFSTimelineStore", context="yarn") +public class EntityGroupFSTimelineStoreMetrics { + private static final String DEFAULT_VALUE_WITH_SCALE = "TimeMs"; + + // General read related metrics + @Metric("getEntity calls to summary storage") + private MutableCounterLong getEntityToSummaryOps; + + @Metric("getEntity calls to detail storage") + private MutableCounterLong getEntityToDetailOps; + + // Summary data related metrics + @Metric(value = "summary log read ops and time", + valueName = DEFAULT_VALUE_WITH_SCALE) + private MutableStat summaryLogRead; + + @Metric("entities read into the summary storage") + private MutableCounterLong entitiesReadToSummary; + + // Detail data cache related metrics + @Metric("cache storage read that does not require a refresh") + private MutableCounterLong noRefreshCacheRead; + + @Metric("cache storage refresh due to the cached storage is stale") + private MutableCounterLong cacheStaleRefreshes; + + @Metric("cache storage evicts") + private MutableCounterLong cacheEvicts; + + @Metric(value = "cache storage refresh ops and time", + valueName = DEFAULT_VALUE_WITH_SCALE) + private MutableStat cacheRefresh; + + // Log scanner and cleaner related metrics + @Metric(value = "active log scan ops and time", + valueName = DEFAULT_VALUE_WITH_SCALE) + private MutableStat activeLogDirScan; + + @Metric(value = "log cleaner purging ops and time", + valueName = DEFAULT_VALUE_WITH_SCALE) + private MutableStat logClean; + + @Metric("log cleaner dirs purged") + private MutableCounterLong logsDirsCleaned; + + private static EntityGroupFSTimelineStoreMetrics instance = null; + + EntityGroupFSTimelineStoreMetrics() { + } + + public static synchronized EntityGroupFSTimelineStoreMetrics create() { + if (instance == null) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + instance = ms.register(new EntityGroupFSTimelineStoreMetrics()); + } + return instance; + } + + // Setters + // General read related + public void incrGetEntityToSummaryOps() { + getEntityToSummaryOps.incr(); + } + + public void incrGetEntityToDetailOps() { + getEntityToDetailOps.incr(); + } + + // Summary data related + public void addSummaryLogReadTime(long msec) { + summaryLogRead.add(msec); + } + + public void incrEntitiesReadToSummary(long delta) { + entitiesReadToSummary.incr(delta); + } + + // Cache related + public void incrNoRefreshCacheRead() { + noRefreshCacheRead.incr(); + } + + public void incrCacheStaleRefreshes() { + cacheStaleRefreshes.incr(); + } + + public void incrCacheEvicts() { + cacheEvicts.incr(); + } + + public void addCacheRefreshTime(long msec) { + cacheRefresh.add(msec); + } + + // Log scanner and cleaner related + public void addActiveLogDirScanTime(long msec) { + activeLogDirScan.add(msec); + } + + public void addLogCleanTime(long msec) { + logClean.add(msec); + } + + public void incrLogsDirsCleaned() { + logsDirsCleaned.incr(); + } + + // Getters + MutableCounterLong getEntitiesReadToSummary() { + return entitiesReadToSummary; + } + + MutableCounterLong getLogsDirsCleaned() { + return logsDirsCleaned; + } + + MutableCounterLong getGetEntityToSummaryOps() { + return getEntityToSummaryOps; + } + + MutableCounterLong getGetEntityToDetailOps() { + return getEntityToDetailOps; + } + + MutableStat getCacheRefresh() { + return cacheRefresh; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java index bc8017564f5..96bca8d1f90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java @@ -98,13 +98,14 @@ boolean matchesGroupId(String groupId){ )); } - public void parseForStore(TimelineDataManager tdm, Path appDirPath, + public long parseForStore(TimelineDataManager tdm, Path appDirPath, boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper, FileSystem fs) throws IOException { LOG.debug("Parsing for log dir {} on attempt {}", appDirPath, attemptDirName); Path logPath = getPath(appDirPath); FileStatus status = fs.getFileStatus(logPath); + long numParsed = 0; if (status != null) { long startTime = Time.monotonicNow(); try { @@ -113,6 +114,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, objMapper, fs); LOG.info("Parsed {} entities from {} in {} msec", count, logPath, Time.monotonicNow() - startTime); + numParsed += count; } catch (RuntimeException e) { // If AppLogs cannot parse this log, it may be corrupted or just empty if (e.getCause() instanceof JsonParseException && @@ -125,6 +127,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, } else { LOG.warn("{} no longer exists. Skip for scanning. ", logPath); } + return numParsed; } private long parsePath(TimelineDataManager tdm, Path logPath, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 4e491fce43c..7a8ff2f9e89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableStat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -177,12 +179,15 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, @Test public void testParseSummaryLogs() throws Exception { TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config); + MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary(); + long beforeScan = scanned.value(); EntityGroupFSTimelineStore.AppLogs appLogs = store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, AppState.COMPLETED); appLogs.scanForLogs(); appLogs.parseSummaryLogs(tdm); PluginStoreTestUtils.verifyTestEntities(tdm); + assertEquals(beforeScan + 2L, scanned.value()); } @Test @@ -227,6 +232,8 @@ public void testCleanLogs() throws Exception { fs.mkdirs(dirPathEmpty); // Should retain all logs after this run + MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned(); + long before = dirsCleaned.value(); store.cleanLogs(testDoneDirPath, fs, 10000); assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantFilePath)); @@ -256,6 +263,7 @@ public void testCleanLogs() throws Exception { // appDirClean and appDirEmpty should be cleaned up assertFalse(fs.exists(appDirClean)); assertFalse(fs.exists(appDirEmpty)); + assertEquals(before + 2L, dirsCleaned.value()); } @Test @@ -272,6 +280,12 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, cacheItem.setAppLogs(appLogs); store.setCachedLogs( EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem); + MutableCounterLong detailLogEntityRead = + store.metrics.getGetEntityToDetailOps(); + MutableStat cacheRefresh = store.metrics.getCacheRefresh(); + long numEntityReadBefore = detailLogEntityRead.value(); + long cacheRefreshBefore = cacheRefresh.lastStat().numSamples(); + // Generate TDM TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithStore(config, store); @@ -290,6 +304,9 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, for (TimelineEntity entity : entities.getEntities()) { assertEquals(entityNew.getStartTime(), entity.getStartTime()); } + // Verify metrics + assertEquals(numEntityReadBefore + 2L, detailLogEntityRead.value()); + assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples()); } @Test @@ -298,6 +315,9 @@ public void testSummaryRead() throws Exception { EntityGroupFSTimelineStore.AppLogs appLogs = store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, AppState.COMPLETED); + MutableCounterLong summaryLogEntityRead + = store.metrics.getGetEntityToSummaryOps(); + long numEntityReadBefore = summaryLogEntityRead.value(); TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithStore(config, store); appLogs.scanForLogs(); @@ -313,6 +333,8 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, for (TimelineEntity entity : entities.getEntities()) { assertEquals((Long) 123l, entity.getStartTime()); } + // Verify metrics + assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value()); }