YARN-5314. Fixed a ConcurrentModificationException in ATS v1.5 EntityGroupFSTimelineStore. Contributed by Li Lu.

This commit is contained in:
Vinod Kumar Vavilapalli 2016-07-07 22:37:29 -07:00
parent 9d46a49c74
commit 673e5e02fe
4 changed files with 34 additions and 43 deletions

View File

@ -47,7 +47,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;

View File

@ -19,19 +19,13 @@ package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -47,15 +41,12 @@ public class EntityCacheItem {
private EntityGroupFSTimelineStore.AppLogs appLogs;
private long lastRefresh;
private Configuration config;
private FileSystem fs;
private int refCount = 0;
private static AtomicInteger activeStores = new AtomicInteger(0);
public EntityCacheItem(TimelineEntityGroupId gId, Configuration config,
FileSystem fs) {
public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
this.groupId = gId;
this.config = config;
this.fs = fs;
}
/**
@ -97,15 +88,12 @@ public class EntityCacheItem {
* other operations on the same cache item.
*
* @param aclManager ACL manager for the timeline storage
* @param jsonFactory JSON factory for the storage
* @param objMapper Object mapper for the storage
* @param metrics Metrics to trace the status of the entity group store
* @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
* object filled with all entities in the group.
* @throws IOException
*/
public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
JsonFactory jsonFactory, ObjectMapper objMapper,
EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
if (needRefresh()) {
long startTime = Time.monotonicNow();
@ -128,31 +116,14 @@ public class EntityCacheItem {
// Store is not null, the refresh is triggered by stale storage.
metrics.incrCacheStaleRefreshes();
}
List<LogInfo> removeList = new ArrayList<>();
try (TimelineDataManager tdm =
new TimelineDataManager(store, aclManager)) {
tdm.init(config);
tdm.start();
for (LogInfo log : appLogs.getDetailLogs()) {
LOG.debug("Try refresh logs for {}", log.getFilename());
// Only refresh the log that matches the cache id
if (log.matchesGroupId(groupId)) {
Path appDirPath = appLogs.getAppDirPath();
if (fs.exists(log.getPath(appDirPath))) {
LOG.debug("Refresh logs for cache id {}", groupId);
log.parseForStore(tdm, appDirPath, appLogs.isDone(),
jsonFactory, objMapper, fs);
} else {
// The log may have been removed, remove the log
removeList.add(log);
LOG.info("File {} no longer exists, removing it from log list",
log.getPath(appDirPath));
// Load data from appLogs to tdm
appLogs.loadDetailLog(tdm, groupId);
}
}
}
}
appLogs.getDetailLogs().removeAll(removeList);
}
updateRefreshTimeToNow();
metrics.addCacheRefreshTime(Time.monotonicNow() - startTime);
} else {

View File

@ -230,7 +230,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
List<TimelineEntityGroupPlugin> pluginList
= new LinkedList<TimelineEntityGroupPlugin>();
Exception caught = null;
ClassLoader customClassLoader = null;
if (pluginClasspath != null && pluginClasspath.length() > 0) {
try {
@ -775,8 +774,8 @@ public class EntityGroupFSTimelineStore extends CompositeService
summaryLogs.add(log);
}
private void addDetailLog(String attemptDirName, String filename,
String owner) {
private synchronized void addDetailLog(String attemptDirName,
String filename, String owner) {
for (LogInfo log : detailLogs) {
if (log.getFilename().equals(filename)
&& log.getAttemptDirName().equals(attemptDirName)) {
@ -786,6 +785,30 @@ public class EntityGroupFSTimelineStore extends CompositeService
detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
}
synchronized void loadDetailLog(TimelineDataManager tdm,
TimelineEntityGroupId groupId) throws IOException {
List<LogInfo> removeList = new ArrayList<>();
for (LogInfo log : detailLogs) {
LOG.debug("Try refresh logs for {}", log.getFilename());
// Only refresh the log that matches the cache id
if (log.matchesGroupId(groupId)) {
Path dirPath = getAppDirPath();
if (fs.exists(log.getPath(dirPath))) {
LOG.debug("Refresh logs for cache id {}", groupId);
log.parseForStore(tdm, dirPath, isDone(),
jsonFactory, objMapper, fs);
} else {
// The log may have been removed, remove the log
removeList.add(log);
LOG.info(
"File {} no longer exists, removing it from log list",
log.getPath(dirPath));
}
}
}
detailLogs.removeAll(removeList);
}
public synchronized void moveToDone() throws IOException {
Path doneAppPath = getDoneAppPath(appId);
if (!doneAppPath.equals(appDirPath)) {
@ -974,7 +997,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
cacheItem = this.cachedLogs.get(groupId);
if (cacheItem == null) {
LOG.debug("Set up new cache item for id {}", groupId);
cacheItem = new EntityCacheItem(groupId, getConfig(), fs);
cacheItem = new EntityCacheItem(groupId, getConfig());
AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
if (appLogs != null) {
LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
@ -994,8 +1017,7 @@ public class EntityGroupFSTimelineStore extends CompositeService
// Add the reference by the store
cacheItem.incrRefs();
cacheItems.add(cacheItem);
store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper,
metrics);
store = cacheItem.refreshCache(aclManager, metrics);
} else {
LOG.warn("AppLogs for group id {} is null", groupId);
}

View File

@ -354,7 +354,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
AppState.COMPLETED);
EntityCacheItem cacheItem = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
config, fs);
config);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
@ -406,7 +406,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
AppState.COMPLETED);
final EntityCacheItem cacheItem = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
config, fs);
config);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
@ -442,7 +442,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
AppState.COMPLETED);
EntityCacheItem item = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
config, fs);
config);
item.setAppLogs(currAppLog);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),