diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index dd2a27d43d6..1566ae254a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -16,6 +16,8 @@ */ package org.apache.hadoop.yarn.server.timeline; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * Cache item for timeline server v1.5 reader cache. Each cache item has a @@ -40,12 +43,17 @@ public class EntityCacheItem { = LoggerFactory.getLogger(EntityCacheItem.class); private TimelineStore store; + private TimelineEntityGroupId groupId; private EntityGroupFSTimelineStore.AppLogs appLogs; private long lastRefresh; private Configuration config; private FileSystem fs; + private int refCount = 0; + private static AtomicInteger activeStores = new AtomicInteger(0); - public EntityCacheItem(Configuration config, FileSystem fs) { + public EntityCacheItem(TimelineEntityGroupId gId, Configuration config, + FileSystem fs) { + this.groupId = gId; this.config = config; this.fs = fs; } @@ -70,17 +78,24 @@ public synchronized void setAppLogs( /** * @return The timeline store, either loaded or unloaded, of this cache item. + * This method will not hold the storage from being reclaimed. */ public synchronized TimelineStore getStore() { return store; } + /** + * @return The number of currently active stores in all CacheItems. + */ + public static int getActiveStores() { + return activeStores.get(); + } + /** * Refresh this cache item if it needs refresh. This will enforce an appLogs * rescan and then load new data. The refresh process is synchronized with * other operations on the same cache item. * - * @param groupId Group id of the cache * @param aclManager ACL manager for the timeline storage * @param jsonFactory JSON factory for the storage * @param objMapper Object mapper for the storage @@ -89,10 +104,9 @@ public synchronized TimelineStore getStore() { * object filled with all entities in the group. * @throws IOException */ - public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, - TimelineACLsManager aclManager, JsonFactory jsonFactory, - ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics) - throws IOException { + public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager, + JsonFactory jsonFactory, ObjectMapper objMapper, + EntityGroupFSTimelineStoreMetrics metrics) throws IOException { if (needRefresh()) { long startTime = Time.monotonicNow(); // If an application is not finished, we only update summary logs (and put @@ -105,6 +119,7 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, } if (!appLogs.getDetailLogs().isEmpty()) { if (store == null) { + activeStores.getAndIncrement(); store = new LevelDBCacheTimelineStore(groupId.toString(), "LeveldbCache." + groupId); store.init(config); @@ -148,11 +163,35 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, } /** - * Release the cache item for the given group id. - * - * @param groupId the group id that the cache should release + * Increase the number of references to this cache item by 1. */ - public synchronized void releaseCache(TimelineEntityGroupId groupId) { + public synchronized void incrRefs() { + refCount++; + } + + /** + * Unregister a reader. Try to release the cache if the reader to current + * cache reaches 0. + * + * @return true if the cache has been released, otherwise false + */ + public synchronized boolean tryRelease() { + refCount--; + // Only reclaim the storage if there is no reader. + if (refCount > 0) { + LOG.debug("{} references left for cached group {}, skipping the release", + refCount, groupId); + return false; + } + forceRelease(); + return true; + } + + /** + * Force releasing the cache item for the given group id, even though there + * may be active references. + */ + public synchronized void forceRelease() { try { if (store != null) { store.close(); @@ -161,12 +200,21 @@ public synchronized void releaseCache(TimelineEntityGroupId groupId) { LOG.warn("Error closing timeline store", e); } store = null; + activeStores.getAndDecrement(); + refCount = 0; // reset offsets so next time logs are re-parsed for (LogInfo log : appLogs.getDetailLogs()) { if (log.getFilename().contains(groupId.toString())) { log.setOffset(0); } } + LOG.debug("Cache for group {} released. ", groupId); + } + + @InterfaceAudience.Private + @VisibleForTesting + synchronized int getRefCount() { + return refCount; } private boolean needRefresh() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index edd430c2647..231ca7241af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -108,6 +108,10 @@ public class EntityGroupFSTimelineStore extends CompositeService + "%04d" + Path.SEPARATOR // app num / 1,000,000 + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000 + "%s" + Path.SEPARATOR; // full app id + // Indicates when to force release a cache item even if there are active + // readers. Enlarge this factor may increase memory usage for the reader since + // there may be more cache items "hanging" in memory but not in cache. + private static final int CACHE_ITEM_OVERFLOW_FACTOR = 2; private YarnClient yarnClient; private TimelineStore summaryStore; @@ -172,7 +176,15 @@ protected boolean removeEldestEntry( TimelineEntityGroupId groupId = eldest.getKey(); LOG.debug("Evicting {} due to space limitations", groupId); EntityCacheItem cacheItem = eldest.getValue(); - cacheItem.releaseCache(groupId); + int activeStores = EntityCacheItem.getActiveStores(); + if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) { + LOG.debug("Force release cache {} since {} stores are active", + groupId, activeStores); + cacheItem.forceRelease(); + } else { + LOG.debug("Try release cache {}", groupId); + cacheItem.tryRelease(); + } if (cacheItem.getAppLogs().isDone()) { appIdLogMap.remove(groupId.getApplicationId()); } @@ -826,17 +838,19 @@ void setFs(FileSystem incomingFs) { @InterfaceAudience.Private @VisibleForTesting void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { + cacheItem.incrRefs(); cachedLogs.put(groupId, cacheItem); } private List getTimelineStoresFromCacheIds( - Set groupIds, String entityType) + Set groupIds, String entityType, + List cacheItems) throws IOException { List stores = new LinkedList(); // For now we just handle one store in a context. We return the first // non-null storage for the group ids. for (TimelineEntityGroupId groupId : groupIds) { - TimelineStore storeForId = getCachedStore(groupId); + TimelineStore storeForId = getCachedStore(groupId, cacheItems); if (storeForId != null) { LOG.debug("Adding {} as a store for the query", storeForId.getName()); stores.add(storeForId); @@ -851,8 +865,9 @@ private List getTimelineStoresFromCacheIds( return stores; } - private List getTimelineStoresForRead(String entityId, - String entityType) throws IOException { + protected List getTimelineStoresForRead(String entityId, + String entityType, List cacheItems) + throws IOException { Set groupIds = new HashSet(); for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { LOG.debug("Trying plugin {} for id {} and type {}", @@ -871,12 +886,12 @@ private List getTimelineStoresForRead(String entityId, cacheIdPlugin.getClass().getName()); } } - return getTimelineStoresFromCacheIds(groupIds, entityType); + return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems); } private List getTimelineStoresForRead(String entityType, - NameValuePair primaryFilter, Collection secondaryFilters) - throws IOException { + NameValuePair primaryFilter, Collection secondaryFilters, + List cacheItems) throws IOException { Set groupIds = new HashSet(); for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { Set idsFromPlugin = @@ -888,24 +903,26 @@ private List getTimelineStoresForRead(String entityType, groupIds.addAll(idsFromPlugin); } } - return getTimelineStoresFromCacheIds(groupIds, entityType); + return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems); } // find a cached timeline store or null if it cannot be located - private TimelineStore getCachedStore(TimelineEntityGroupId groupId) - throws IOException { + private TimelineStore getCachedStore(TimelineEntityGroupId groupId, + List cacheItems) throws IOException { EntityCacheItem cacheItem; synchronized (this.cachedLogs) { // Note that the content in the cache log storage may be stale. cacheItem = this.cachedLogs.get(groupId); if (cacheItem == null) { LOG.debug("Set up new cache item for id {}", groupId); - cacheItem = new EntityCacheItem(getConfig(), fs); + cacheItem = new EntityCacheItem(groupId, getConfig(), fs); AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId()); if (appLogs != null) { LOG.debug("Set applogs {} for group id {}", appLogs, groupId); cacheItem.setAppLogs(appLogs); this.cachedLogs.put(groupId, cacheItem); + // Add the reference by the cache + cacheItem.incrRefs(); } else { LOG.warn("AppLogs for groupId {} is set to null!", groupId); } @@ -915,30 +932,43 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId) if (cacheItem.getAppLogs() != null) { AppLogs appLogs = cacheItem.getAppLogs(); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); - store = cacheItem.refreshCache(groupId, aclManager, jsonFactory, - objMapper, metrics); + // Add the reference by the store + cacheItem.incrRefs(); + cacheItems.add(cacheItem); + store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper, + metrics); } else { LOG.warn("AppLogs for group id {} is null", groupId); } return store; } + protected void tryReleaseCacheItems(List relatedCacheItems) { + for (EntityCacheItem item : relatedCacheItems) { + item.tryRelease(); + } + } + @Override public TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection secondaryFilters, EnumSet fieldsToRetrieve, CheckAcl checkAcl) throws IOException { LOG.debug("getEntities type={} primary={}", entityType, primaryFilter); + List relatedCacheItems = new ArrayList<>(); List stores = getTimelineStoresForRead(entityType, - primaryFilter, secondaryFilters); + primaryFilter, secondaryFilters, relatedCacheItems); TimelineEntities returnEntities = new TimelineEntities(); for (TimelineStore store : stores) { LOG.debug("Try timeline store {} for the request", store.getName()); - returnEntities.addEntities( - store.getEntities(entityType, limit, windowStart, windowEnd, fromId, - fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve, - checkAcl).getEntities()); + TimelineEntities entities = store.getEntities(entityType, limit, + windowStart, windowEnd, fromId, fromTs, primaryFilter, + secondaryFilters, fieldsToRetrieve, checkAcl); + if (entities != null) { + returnEntities.addEntities(entities.getEntities()); + } } + tryReleaseCacheItems(relatedCacheItems); return returnEntities; } @@ -946,17 +976,21 @@ public TimelineEntities getEntities(String entityType, Long limit, public TimelineEntity getEntity(String entityId, String entityType, EnumSet fieldsToRetrieve) throws IOException { LOG.debug("getEntity type={} id={}", entityType, entityId); - List stores = getTimelineStoresForRead(entityId, entityType); + List relatedCacheItems = new ArrayList<>(); + List stores = getTimelineStoresForRead(entityId, entityType, + relatedCacheItems); for (TimelineStore store : stores) { LOG.debug("Try timeline store {}:{} for the request", store.getName(), store.toString()); TimelineEntity e = store.getEntity(entityId, entityType, fieldsToRetrieve); if (e != null) { + tryReleaseCacheItems(relatedCacheItems); return e; } } LOG.debug("getEntity: Found nothing"); + tryReleaseCacheItems(relatedCacheItems); return null; } @@ -966,10 +1000,11 @@ public TimelineEvents getEntityTimelines(String entityType, Long windowEnd, Set eventTypes) throws IOException { LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds); TimelineEvents returnEvents = new TimelineEvents(); + List relatedCacheItems = new ArrayList<>(); for (String entityId : entityIds) { LOG.debug("getEntityTimeline type={} id={}", entityType, entityId); List stores - = getTimelineStoresForRead(entityId, entityType); + = getTimelineStoresForRead(entityId, entityType, relatedCacheItems); for (TimelineStore store : stores) { LOG.debug("Try timeline store {}:{} for the request", store.getName(), store.toString()); @@ -978,9 +1013,12 @@ public TimelineEvents getEntityTimelines(String entityType, TimelineEvents events = store.getEntityTimelines(entityType, entityIdSet, limit, windowStart, windowEnd, eventTypes); - returnEvents.addEvents(events.getAllEvents()); + if (events != null) { + returnEvents.addEvents(events.getAllEvents()); + } } } + tryReleaseCacheItems(relatedCacheItems); return returnEvents; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java index 71e26cbf548..db241a891d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.timeline; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.util.ConverterUtils; import java.util.Collection; import java.util.Set; @@ -26,31 +28,32 @@ class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin { - private static TimelineEntityGroupId timelineEntityGroupId - = TimelineEntityGroupId.newInstance( - TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test"); + static final String APP_ID_FILTER_NAME = "appid"; @Override public Set getTimelineEntityGroupId(String entityType, NameValuePair primaryFilter, Collection secondaryFilters) { - return Sets.newHashSet(timelineEntityGroupId); + ApplicationId appId + = ConverterUtils.toApplicationId(primaryFilter.getValue().toString()); + return Sets.newHashSet(getStandardTimelineGroupId(appId)); } @Override public Set getTimelineEntityGroupId(String entityId, String entityType) { - return Sets.newHashSet(timelineEntityGroupId); + ApplicationId appId = ConverterUtils.toApplicationId(entityId); + return Sets.newHashSet(getStandardTimelineGroupId(appId)); } @Override public Set getTimelineEntityGroupId(String entityType, SortedSet entityIds, Set eventTypes) { - return Sets.newHashSet(timelineEntityGroupId); + return Sets.newHashSet(); } - static TimelineEntityGroupId getStandardTimelineGroupId() { - return timelineEntityGroupId; + static TimelineEntityGroupId getStandardTimelineGroupId(ApplicationId appId) { + return TimelineEntityGroupId.newInstance(appId, "test"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 7a8ff2f9e89..d6baab67370 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -44,8 +44,17 @@ import org.junit.rules.TestName; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; import static org.junit.Assert.assertEquals; @@ -53,24 +62,15 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { - private static final String SAMPLE_APP_NAME = "1234_5678"; + private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000"; + private static final int CACHE_TEST_CACHE_SIZE = 5; - static final ApplicationId TEST_APPLICATION_ID - = ConverterUtils.toApplicationId( - ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME); - - private static final String TEST_APP_DIR_NAME - = TEST_APPLICATION_ID.toString(); - private static final String TEST_ATTEMPT_DIR_NAME - = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1"; private static final String TEST_SUMMARY_LOG_FILE_NAME = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test"; - private static final String TEST_ENTITY_LOG_FILE_NAME - = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX - + EntityGroupPlugInForTest.getStandardTimelineGroupId(); private static final String TEST_DOMAIN_LOG_FILE_NAME = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test"; @@ -78,9 +78,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { = new Path(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestEntityGroupFSTimelineStore.class.getSimpleName()); - private static Path testAppDirPath; - private static Path testAttemptDirPath; - private static Path testDoneDirPath; private static Configuration config = new YarnConfiguration(); private static MiniDFSCluster hdfsCluster; @@ -88,7 +85,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { private static FileContext fc; private static FileContextTestHelper fileContextTestHelper = new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore"); - private EntityGroupFSTimelineStore store; + + private static List sampleAppIds; + private static ApplicationId mainTestAppId; + private static Path mainTestAppDirPath; + private static Path testDoneDirPath; + private static String mainEntityLogFileName; + + private EntityGroupFSTimelineStoreForTest store; private TimelineEntity entityNew; @Rule @@ -101,23 +105,44 @@ public static void setupClass() throws Exception { YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES, "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER"); + config.setInt( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE, + CACHE_TEST_CACHE_SIZE); config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); HdfsConfiguration hdfsConfig = new HdfsConfiguration(); hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build(); fs = hdfsCluster.getFileSystem(); fc = FileContext.getFileContext(hdfsCluster.getURI(0), config); - testAppDirPath = getTestRootPath(TEST_APPLICATION_ID.toString()); - testAttemptDirPath = new Path(testAppDirPath, TEST_ATTEMPT_DIR_NAME); - testDoneDirPath = getTestRootPath("done"); - config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, testDoneDirPath.toString()); + sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1); + for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) { + ApplicationId appId = ConverterUtils.toApplicationId( + ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST + + i); + sampleAppIds.add(appId); + } + // Among all sample applicationIds, choose the first one for most of the + // tests. + mainTestAppId = sampleAppIds.get(0); + mainTestAppDirPath = getTestRootPath(mainTestAppId.toString()); + mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX + + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId); + + testDoneDirPath = getTestRootPath("done"); + config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, + testDoneDirPath.toString()); } @Before public void setup() throws Exception { - createTestFiles(); - store = new EntityGroupFSTimelineStore(); + for (ApplicationId appId : sampleAppIds) { + Path attemotDirPath = new Path(getTestRootPath(appId.toString()), + getAttemptDirName(appId)); + createTestFiles(appId, attemotDirPath); + } + + store = new EntityGroupFSTimelineStoreForTest(); if (currTestName.getMethodName().contains("Plugin")) { config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES, EntityGroupPlugInForTest.class.getName()); @@ -130,7 +155,9 @@ public void setup() throws Exception { @After public void tearDown() throws Exception { store.stop(); - fs.delete(testAppDirPath, true); + for (ApplicationId appId : sampleAppIds) { + fs.delete(getTestRootPath(appId.toString()), true); + } } @AfterClass @@ -144,7 +171,7 @@ public static void tearDownClass() throws Exception { @Test public void testAppLogsScanLogs() throws Exception { EntityGroupFSTimelineStore.AppLogs appLogs = - store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, + store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); appLogs.scanForLogs(); List summaryLogs = appLogs.getSummaryLogs(); @@ -160,14 +187,14 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, for (LogInfo log : detailLogs) { String fileName = log.getFilename(); - assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME); + assertEquals(fileName, mainEntityLogFileName); } } @Test public void testMoveToDone() throws Exception { EntityGroupFSTimelineStore.AppLogs appLogs = - store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, + store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); Path pathBefore = appLogs.getAppDirPath(); appLogs.moveToDone(); @@ -182,7 +209,7 @@ public void testParseSummaryLogs() throws Exception { MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary(); long beforeScan = scanned.value(); EntityGroupFSTimelineStore.AppLogs appLogs = - store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, + store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); appLogs.scanForLogs(); appLogs.parseSummaryLogs(tdm); @@ -194,6 +221,9 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, public void testCleanLogs() throws Exception { // Create test dirs and files // Irrelevant file, should not be reclaimed + String appDirName = mainTestAppId.toString(); + String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix + + appDirName + "_1"; Path irrelevantFilePath = new Path( testDoneDirPath, "irrelevant.log"); FSDataOutputStream stream = fs.create(irrelevantFilePath); @@ -204,29 +234,29 @@ public void testCleanLogs() throws Exception { Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001"); // First application, untouched after creation - Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME); - Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME); + Path appDirClean = new Path(doneAppHomeDir, appDirName); + Path attemptDirClean = new Path(appDirClean, attemptDirName); fs.mkdirs(attemptDirClean); Path filePath = new Path(attemptDirClean, "test.log"); stream = fs.create(filePath); stream.close(); // Second application, one file touched after creation - Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1"); + Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1"); Path attemptDirHoldByFile - = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME); + = new Path(appDirHoldByFile, attemptDirName); fs.mkdirs(attemptDirHoldByFile); Path filePathHold = new Path(attemptDirHoldByFile, "test1.log"); stream = fs.create(filePathHold); stream.close(); // Third application, one dir touched after creation - Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2"); - Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME); + Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2"); + Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName); fs.mkdirs(attemptDirHoldByDir); Path dirPathHold = new Path(attemptDirHoldByDir, "hold"); fs.mkdirs(dirPathHold); // Fourth application, empty dirs - Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3"); - Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME); + Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3"); + Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName); fs.mkdirs(attemptDirEmpty); Path dirPathEmpty = new Path(attemptDirEmpty, "empty"); fs.mkdirs(dirPathEmpty); @@ -274,12 +304,15 @@ public void testPluginRead() throws Exception { YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES)); // Load data and cache item, prepare timeline store by making a cache item EntityGroupFSTimelineStore.AppLogs appLogs = - store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, + store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); - EntityCacheItem cacheItem = new EntityCacheItem(config, fs); + EntityCacheItem cacheItem = new EntityCacheItem( + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), + config, fs); cacheItem.setAppLogs(appLogs); store.setCachedLogs( - EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem); + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), + cacheItem); MutableCounterLong detailLogEntityRead = store.metrics.getGetEntityToDetailOps(); MutableStat cacheRefresh = store.metrics.getCacheRefresh(); @@ -291,16 +324,20 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, = PluginStoreTestUtils.getTdmWithStore(config, store); // Verify single entity read - TimelineEntity entity3 = tdm.getEntity("type_3", "id_3", + TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(), EnumSet.allOf(TimelineReader.Field.class), UserGroupInformation.getLoginUser()); assertNotNull(entity3); assertEquals(entityNew.getStartTime(), entity3.getStartTime()); + assertEquals(1, cacheItem.getRefCount()); + assertEquals(1, EntityCacheItem.getActiveStores()); // Verify multiple entities read - TimelineEntities entities = tdm.getEntities("type_3", null, null, null, - null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), + NameValuePair primaryFilter = new NameValuePair( + EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString()); + TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null, + null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), UserGroupInformation.getLoginUser()); - assertEquals(entities.getEntities().size(), 1); + assertEquals(1, entities.getEntities().size()); for (TimelineEntity entity : entities.getEntities()) { assertEquals(entityNew.getStartTime(), entity.getStartTime()); } @@ -309,11 +346,79 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples()); } + @Test(timeout = 90000L) + public void testMultiplePluginRead() throws Exception { + Thread mainThread = Thread.currentThread(); + mainThread.setName("testMain"); + // Verify precondition + assertEquals(EntityGroupPlugInForTest.class.getName(), + store.getConfig().get( + YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES)); + // Prepare timeline store by making cache items + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(mainTestAppId, mainTestAppDirPath, + AppState.COMPLETED); + final EntityCacheItem cacheItem = new EntityCacheItem( + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), + config, fs); + + cacheItem.setAppLogs(appLogs); + store.setCachedLogs( + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), + cacheItem); + + // Launch the blocking read call in a future + ExecutorService threadExecutor = Executors.newSingleThreadExecutor(); + FutureTask blockingReader = + new FutureTask<>(new Callable() { + public TimelineEntity call() throws Exception { + Thread currThread = Thread.currentThread(); + currThread.setName("blockingReader"); + return store.getEntityBlocking(mainTestAppId.toString(), "type_3", + EnumSet.allOf(TimelineReader.Field.class)); + }}); + threadExecutor.execute(blockingReader); + try { + while (!store.testCacheReferenced) { + Thread.sleep(300); + } + } catch (InterruptedException e) { + fail("Interrupted on exception " + e); + } + // Try refill the cache after the first cache item is referenced + for (ApplicationId appId : sampleAppIds) { + // Skip the first appId since it's already in cache + if (appId.equals(mainTestAppId)) { + continue; + } + EntityGroupFSTimelineStore.AppLogs currAppLog = + store.new AppLogs(appId, getTestRootPath(appId.toString()), + AppState.COMPLETED); + EntityCacheItem item = new EntityCacheItem( + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId), + config, fs); + item.setAppLogs(currAppLog); + store.setCachedLogs( + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId), + item); + } + // At this time, the cache item of the blocking reader should be evicted. + assertEquals(1, cacheItem.getRefCount()); + store.testCanProceed = true; + TimelineEntity entity3 = blockingReader.get(); + + assertNotNull(entity3); + assertEquals(entityNew.getStartTime(), entity3.getStartTime()); + assertEquals(0, cacheItem.getRefCount()); + + threadExecutor.shutdownNow(); + } + @Test public void testSummaryRead() throws Exception { // Load data EntityGroupFSTimelineStore.AppLogs appLogs = - store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, + store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); MutableCounterLong summaryLogEntityRead = store.metrics.getGetEntityToSummaryOps(); @@ -331,28 +436,32 @@ store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, UserGroupInformation.getLoginUser()); assertEquals(entities.getEntities().size(), 1); for (TimelineEntity entity : entities.getEntities()) { - assertEquals((Long) 123l, entity.getStartTime()); + assertEquals((Long) 123L, entity.getStartTime()); } // Verify metrics assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value()); } - private void createTestFiles() throws IOException { + private void createTestFiles(ApplicationId appId, Path attemptDirPath) + throws IOException { TimelineEntities entities = PluginStoreTestUtils.generateTestEntities(); PluginStoreTestUtils.writeEntities(entities, - new Path(testAttemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs); - + new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs); + Map> primaryFilters = new HashMap<>(); + Set appSet = new HashSet(); + appSet.add(appId.toString()); + primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet); entityNew = PluginStoreTestUtils - .createEntity("id_3", "type_3", 789l, null, null, - null, null, "domain_id_1"); + .createEntity(appId.toString(), "type_3", 789L, null, null, + primaryFilters, null, "domain_id_1"); TimelineEntities entityList = new TimelineEntities(); entityList.addEntity(entityNew); PluginStoreTestUtils.writeEntities(entityList, - new Path(testAttemptDirPath, TEST_ENTITY_LOG_FILE_NAME), fs); + new Path(attemptDirPath, mainEntityLogFileName), fs); FSDataOutputStream out = fs.create( - new Path(testAttemptDirPath, TEST_DOMAIN_LOG_FILE_NAME)); + new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME)); out.close(); } @@ -360,4 +469,41 @@ private static Path getTestRootPath(String pathString) { return fileContextTestHelper.getTestRootPath(fc, pathString); } + private static String getAttemptDirName(ApplicationId appId) { + return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1"; + } + + private static class EntityGroupFSTimelineStoreForTest + extends EntityGroupFSTimelineStore { + // Flags used for the concurrent testing environment + private volatile boolean testCanProceed = false; + private volatile boolean testCacheReferenced = false; + + TimelineEntity getEntityBlocking(String entityId, String entityType, + EnumSet fieldsToRetrieve) throws IOException { + List relatedCacheItems = new ArrayList<>(); + List stores = getTimelineStoresForRead(entityId, + entityType, relatedCacheItems); + + testCacheReferenced = true; + try { + while (!testCanProceed) { + Thread.sleep(1000); + } + } catch (InterruptedException e) { + fail("Interrupted " + e); + } + + for (TimelineStore store : stores) { + TimelineEntity e = + store.getEntity(entityId, entityType, fieldsToRetrieve); + if (e != null) { + tryReleaseCacheItems(relatedCacheItems); + return e; + } + } + tryReleaseCacheItems(relatedCacheItems); + return null; + } + } }