From e4a62a233d1ac6d0e3ca8b39ab16cdf71b313e56 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 7 Jul 2016 22:37:29 -0700 Subject: [PATCH] YARN-5314. Fixed a ConcurrentModificationException in ATS v1.5 EntityGroupFSTimelineStore. Contributed by Li Lu. (cherry picked from commit 673e5e02feba9171498a518c06ae70639c5f8854) --- .../timeline/webapp/TimelineWebServices.java | 2 -- .../yarn/server/timeline/EntityCacheItem.java | 35 ++----------------- .../timeline/EntityGroupFSTimelineStore.java | 33 ++++++++++++++--- .../TestEntityGroupFSTimelineStore.java | 6 ++-- 4 files changed, 34 insertions(+), 42 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java index e1e684b1e80..5c801326a9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java @@ -47,7 +47,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; -import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index 1566ae254a9..2b6e0235b3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -19,19 +19,13 @@ package org.apache.hadoop.yarn.server.timeline; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** @@ -47,15 +41,12 @@ public class EntityCacheItem { private EntityGroupFSTimelineStore.AppLogs appLogs; private long lastRefresh; private Configuration config; - private FileSystem fs; private int refCount = 0; private static AtomicInteger activeStores = new AtomicInteger(0); - public EntityCacheItem(TimelineEntityGroupId gId, Configuration config, - FileSystem fs) { + public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) { this.groupId = gId; this.config = config; - this.fs = fs; } /** @@ -97,15 +88,12 @@ public class EntityCacheItem { * other operations on the same cache item. * * @param aclManager ACL manager for the timeline storage - * @param jsonFactory JSON factory for the storage - * @param objMapper Object mapper for the storage * @param metrics Metrics to trace the status of the entity group store * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore} * object filled with all entities in the group. * @throws IOException */ public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager, - JsonFactory jsonFactory, ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics) throws IOException { if (needRefresh()) { long startTime = Time.monotonicNow(); @@ -128,30 +116,13 @@ public class EntityCacheItem { // Store is not null, the refresh is triggered by stale storage. metrics.incrCacheStaleRefreshes(); } - List removeList = new ArrayList<>(); try (TimelineDataManager tdm = new TimelineDataManager(store, aclManager)) { tdm.init(config); tdm.start(); - for (LogInfo log : appLogs.getDetailLogs()) { - LOG.debug("Try refresh logs for {}", log.getFilename()); - // Only refresh the log that matches the cache id - if (log.matchesGroupId(groupId)) { - Path appDirPath = appLogs.getAppDirPath(); - if (fs.exists(log.getPath(appDirPath))) { - LOG.debug("Refresh logs for cache id {}", groupId); - log.parseForStore(tdm, appDirPath, appLogs.isDone(), - jsonFactory, objMapper, fs); - } else { - // The log may have been removed, remove the log - removeList.add(log); - LOG.info("File {} no longer exists, removing it from log list", - log.getPath(appDirPath)); - } - } - } + // Load data from appLogs to tdm + appLogs.loadDetailLog(tdm, groupId); } - appLogs.getDetailLogs().removeAll(removeList); } updateRefreshTimeToNow(); metrics.addCacheRefreshTime(Time.monotonicNow() - startTime); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 958b54e623b..c54bce8642d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -716,8 +716,8 @@ public class EntityGroupFSTimelineStore extends CompositeService summaryLogs.add(log); } - private void addDetailLog(String attemptDirName, String filename, - String owner) { + private synchronized void addDetailLog(String attemptDirName, + String filename, String owner) { for (LogInfo log : detailLogs) { if (log.getFilename().equals(filename) && log.getAttemptDirName().equals(attemptDirName)) { @@ -727,6 +727,30 @@ public class EntityGroupFSTimelineStore extends CompositeService detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner)); } + synchronized void loadDetailLog(TimelineDataManager tdm, + TimelineEntityGroupId groupId) throws IOException { + List removeList = new ArrayList<>(); + for (LogInfo log : detailLogs) { + LOG.debug("Try refresh logs for {}", log.getFilename()); + // Only refresh the log that matches the cache id + if (log.matchesGroupId(groupId)) { + Path dirPath = getAppDirPath(); + if (fs.exists(log.getPath(dirPath))) { + LOG.debug("Refresh logs for cache id {}", groupId); + log.parseForStore(tdm, dirPath, isDone(), + jsonFactory, objMapper, fs); + } else { + // The log may have been removed, remove the log + removeList.add(log); + LOG.info( + "File {} no longer exists, removing it from log list", + log.getPath(dirPath)); + } + } + } + detailLogs.removeAll(removeList); + } + public synchronized void moveToDone() throws IOException { Path doneAppPath = getDoneAppPath(appId); if (!doneAppPath.equals(appDirPath)) { @@ -915,7 +939,7 @@ public class EntityGroupFSTimelineStore extends CompositeService cacheItem = this.cachedLogs.get(groupId); if (cacheItem == null) { LOG.debug("Set up new cache item for id {}", groupId); - cacheItem = new EntityCacheItem(groupId, getConfig(), fs); + cacheItem = new EntityCacheItem(groupId, getConfig()); AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId()); if (appLogs != null) { LOG.debug("Set applogs {} for group id {}", appLogs, groupId); @@ -935,8 +959,7 @@ public class EntityGroupFSTimelineStore extends CompositeService // Add the reference by the store cacheItem.incrRefs(); cacheItems.add(cacheItem); - store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper, - metrics); + store = cacheItem.refreshCache(aclManager, metrics); } else { LOG.warn("AppLogs for group id {} is null", groupId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 1c12f36192b..90afad54841 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -308,7 +308,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { AppState.COMPLETED); EntityCacheItem cacheItem = new EntityCacheItem( EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), - config, fs); + config); cacheItem.setAppLogs(appLogs); store.setCachedLogs( EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), @@ -360,7 +360,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { AppState.COMPLETED); final EntityCacheItem cacheItem = new EntityCacheItem( EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), - config, fs); + config); cacheItem.setAppLogs(appLogs); store.setCachedLogs( @@ -396,7 +396,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { AppState.COMPLETED); EntityCacheItem item = new EntityCacheItem( EntityGroupPlugInForTest.getStandardTimelineGroupId(appId), - config, fs); + config); item.setAppLogs(currAppLog); store.setCachedLogs( EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),