diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cb0ac073699..0eade8ee8e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2136,6 +2136,17 @@ 10485760 + + yarn.timeline-service.entity-group-fs-store.app-cache-size + + Size of the reader cache for ATS v1.5 reader. This value controls how many + entity groups the ATS v1.5 server should cache. If the number of active + read entity groups is greater than the number of caches items, some reads + may return empty data. This value must be greater than 0. + + 10 + + yarn.timeline-service.client.fd-flush-interval-secs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index 2b6e0235b3b..7ed7c4a2846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -16,8 +16,6 @@ */ package org.apache.hadoop.yarn.server.timeline; -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; @@ -26,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; /** * Cache item for timeline server v1.5 reader cache. Each cache item has a @@ -41,8 +38,6 @@ public class EntityCacheItem { private EntityGroupFSTimelineStore.AppLogs appLogs; private long lastRefresh; private Configuration config; - private int refCount = 0; - private static AtomicInteger activeStores = new AtomicInteger(0); public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) { this.groupId = gId; @@ -75,13 +70,6 @@ public class EntityCacheItem { return store; } - /** - * @return The number of currently active stores in all CacheItems. - */ - public static int getActiveStores() { - return activeStores.get(); - } - /** * Refresh this cache item if it needs refresh. This will enforce an appLogs * rescan and then load new data. The refresh process is synchronized with @@ -107,7 +95,6 @@ public class EntityCacheItem { } if (!appLogs.getDetailLogs().isEmpty()) { if (store == null) { - activeStores.getAndIncrement(); store = new LevelDBCacheTimelineStore(groupId.toString(), "LeveldbCache." + groupId); store.init(config); @@ -133,31 +120,6 @@ public class EntityCacheItem { return store; } - /** - * Increase the number of references to this cache item by 1. - */ - public synchronized void incrRefs() { - refCount++; - } - - /** - * Unregister a reader. Try to release the cache if the reader to current - * cache reaches 0. - * - * @return true if the cache has been released, otherwise false - */ - public synchronized boolean tryRelease() { - refCount--; - // Only reclaim the storage if there is no reader. - if (refCount > 0) { - LOG.debug("{} references left for cached group {}, skipping the release", - refCount, groupId); - return false; - } - forceRelease(); - return true; - } - /** * Force releasing the cache item for the given group id, even though there * may be active references. @@ -171,8 +133,6 @@ public class EntityCacheItem { LOG.warn("Error closing timeline store", e); } store = null; - activeStores.getAndDecrement(); - refCount = 0; // reset offsets so next time logs are re-parsed for (LogInfo log : appLogs.getDetailLogs()) { if (log.getFilename().contains(groupId.toString())) { @@ -182,12 +142,6 @@ public class EntityCacheItem { LOG.debug("Cache for group {} released. ", groupId); } - @InterfaceAudience.Private - @VisibleForTesting - synchronized int getRefCount() { - return refCount; - } - private boolean needRefresh() { return (Time.monotonicNow() - lastRefresh > 10000); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 9593a317460..a9d3c1dce5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -181,15 +181,8 @@ public class EntityGroupFSTimelineStore extends CompositeService TimelineEntityGroupId groupId = eldest.getKey(); LOG.debug("Evicting {} due to space limitations", groupId); EntityCacheItem cacheItem = eldest.getValue(); - int activeStores = EntityCacheItem.getActiveStores(); - if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) { - LOG.debug("Force release cache {} since {} stores are active", - groupId, activeStores); - cacheItem.forceRelease(); - } else { - LOG.debug("Try release cache {}", groupId); - cacheItem.tryRelease(); - } + LOG.debug("Force release cache {}.", groupId); + cacheItem.forceRelease(); if (cacheItem.getAppLogs().isDone()) { appIdLogMap.remove(groupId.getApplicationId()); } @@ -920,7 +913,6 @@ public class EntityGroupFSTimelineStore extends CompositeService @InterfaceAudience.Private @VisibleForTesting void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { - cacheItem.incrRefs(); cachedLogs.put(groupId, cacheItem); } @@ -1003,8 +995,6 @@ public class EntityGroupFSTimelineStore extends CompositeService LOG.debug("Set applogs {} for group id {}", appLogs, groupId); cacheItem.setAppLogs(appLogs); this.cachedLogs.put(groupId, cacheItem); - // Add the reference by the cache - cacheItem.incrRefs(); } else { LOG.warn("AppLogs for groupId {} is set to null!", groupId); } @@ -1014,8 +1004,6 @@ public class EntityGroupFSTimelineStore extends CompositeService if (cacheItem.getAppLogs() != null) { AppLogs appLogs = cacheItem.getAppLogs(); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); - // Add the reference by the store - cacheItem.incrRefs(); cacheItems.add(cacheItem); store = cacheItem.refreshCache(aclManager, metrics); } else { @@ -1024,12 +1012,6 @@ public class EntityGroupFSTimelineStore extends CompositeService return store; } - protected void tryReleaseCacheItems(List relatedCacheItems) { - for (EntityCacheItem item : relatedCacheItems) { - item.tryRelease(); - } - } - @Override public TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, @@ -1049,7 +1031,6 @@ public class EntityGroupFSTimelineStore extends CompositeService returnEntities.addEntities(entities.getEntities()); } } - tryReleaseCacheItems(relatedCacheItems); return returnEntities; } @@ -1066,12 +1047,10 @@ public class EntityGroupFSTimelineStore extends CompositeService TimelineEntity e = store.getEntity(entityId, entityType, fieldsToRetrieve); if (e != null) { - tryReleaseCacheItems(relatedCacheItems); return e; } } LOG.debug("getEntity: Found nothing"); - tryReleaseCacheItems(relatedCacheItems); return null; } @@ -1099,7 +1078,6 @@ public class EntityGroupFSTimelineStore extends CompositeService } } } - tryReleaseCacheItems(relatedCacheItems); return returnEvents; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 5a0822def2d..8540d45e543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -57,10 +57,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; import static org.junit.Assert.assertEquals; @@ -68,7 +64,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { @@ -98,7 +93,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { private static Path testDoneDirPath; private static String mainEntityLogFileName; - private EntityGroupFSTimelineStoreForTest store; + private EntityGroupFSTimelineStore store; private TimelineEntity entityNew; @Rule @@ -150,7 +145,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { createTestFiles(appId, attemotDirPath); } - store = new EntityGroupFSTimelineStoreForTest(); + store = new EntityGroupFSTimelineStore(); if (currTestName.getMethodName().contains("Plugin")) { rootDir = GenericTestUtils.getTestDir(getClass() .getSimpleName()); @@ -375,8 +370,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { UserGroupInformation.getLoginUser()); assertNotNull(entity3); assertEquals(entityNew.getStartTime(), entity3.getStartTime()); - assertEquals(1, cacheItem.getRefCount()); - assertEquals(1, EntityCacheItem.getActiveStores()); // Verify multiple entities read NameValuePair primaryFilter = new NameValuePair( EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString()); @@ -392,74 +385,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples()); } - @Test(timeout = 90000L) - public void testMultiplePluginRead() throws Exception { - Thread mainThread = Thread.currentThread(); - mainThread.setName("testMain"); - // Verify precondition - assertEquals(EntityGroupPlugInForTest.class.getName(), - store.getConfig().get( - YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES)); - // Prepare timeline store by making cache items - EntityGroupFSTimelineStore.AppLogs appLogs = - store.new AppLogs(mainTestAppId, mainTestAppDirPath, - AppState.COMPLETED); - final EntityCacheItem cacheItem = new EntityCacheItem( - EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), - config); - - cacheItem.setAppLogs(appLogs); - store.setCachedLogs( - EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId), - cacheItem); - - // Launch the blocking read call in a future - ExecutorService threadExecutor = Executors.newSingleThreadExecutor(); - FutureTask blockingReader = - new FutureTask<>(new Callable() { - public TimelineEntity call() throws Exception { - Thread currThread = Thread.currentThread(); - currThread.setName("blockingReader"); - return store.getEntityBlocking(mainTestAppId.toString(), "type_3", - EnumSet.allOf(TimelineReader.Field.class)); - }}); - threadExecutor.execute(blockingReader); - try { - while (!store.testCacheReferenced) { - Thread.sleep(300); - } - } catch (InterruptedException e) { - fail("Interrupted on exception " + e); - } - // Try refill the cache after the first cache item is referenced - for (ApplicationId appId : sampleAppIds) { - // Skip the first appId since it's already in cache - if (appId.equals(mainTestAppId)) { - continue; - } - EntityGroupFSTimelineStore.AppLogs currAppLog = - store.new AppLogs(appId, getTestRootPath(appId.toString()), - AppState.COMPLETED); - EntityCacheItem item = new EntityCacheItem( - EntityGroupPlugInForTest.getStandardTimelineGroupId(appId), - config); - item.setAppLogs(currAppLog); - store.setCachedLogs( - EntityGroupPlugInForTest.getStandardTimelineGroupId(appId), - item); - } - // At this time, the cache item of the blocking reader should be evicted. - assertEquals(1, cacheItem.getRefCount()); - store.testCanProceed = true; - TimelineEntity entity3 = blockingReader.get(); - - assertNotNull(entity3); - assertEquals(entityNew.getStartTime(), entity3.getStartTime()); - assertEquals(0, cacheItem.getRefCount()); - - threadExecutor.shutdownNow(); - } - @Test public void testSummaryRead() throws Exception { // Load data @@ -518,38 +443,4 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { private static String getAttemptDirName(ApplicationId appId) { return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1"; } - - private static class EntityGroupFSTimelineStoreForTest - extends EntityGroupFSTimelineStore { - // Flags used for the concurrent testing environment - private volatile boolean testCanProceed = false; - private volatile boolean testCacheReferenced = false; - - TimelineEntity getEntityBlocking(String entityId, String entityType, - EnumSet fieldsToRetrieve) throws IOException { - List relatedCacheItems = new ArrayList<>(); - List stores = getTimelineStoresForRead(entityId, - entityType, relatedCacheItems); - - testCacheReferenced = true; - try { - while (!testCanProceed) { - Thread.sleep(1000); - } - } catch (InterruptedException e) { - fail("Interrupted " + e); - } - - for (TimelineStore store : stores) { - TimelineEntity e = - store.getEntity(entityId, entityType, fieldsToRetrieve); - if (e != null) { - tryReleaseCacheItems(relatedCacheItems); - return e; - } - } - tryReleaseCacheItems(relatedCacheItems); - return null; - } - } }