YARN-5432. Lock already held by another process while LevelDB cache store creation for dag. Contributed by Li Lu.

This commit is contained in:
Junping Du 2016-07-28 06:35:24 -07:00
parent 414fbfab41
commit 7f3c306e2e
4 changed files with 15 additions and 181 deletions

View File

@ -2167,6 +2167,17 @@
<value>10485760</value> <value>10485760</value>
</property> </property>
<property>
<name>yarn.timeline-service.entity-group-fs-store.app-cache-size</name>
<description>
Size of the reader cache for ATS v1.5 reader. This value controls how many
entity groups the ATS v1.5 server should cache. If the number of active
read entity groups is greater than the number of caches items, some reads
may return empty data. This value must be greater than 0.
</description>
<value>10</value>
</property>
<property> <property>
<name>yarn.timeline-service.client.fd-flush-interval-secs</name> <name>yarn.timeline-service.client.fd-flush-interval-secs</name>
<description> <description>

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.hadoop.yarn.server.timeline; package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@ -26,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Cache item for timeline server v1.5 reader cache. Each cache item has a * Cache item for timeline server v1.5 reader cache. Each cache item has a
@ -41,8 +38,6 @@ public class EntityCacheItem {
private EntityGroupFSTimelineStore.AppLogs appLogs; private EntityGroupFSTimelineStore.AppLogs appLogs;
private long lastRefresh; private long lastRefresh;
private Configuration config; private Configuration config;
private int refCount = 0;
private static AtomicInteger activeStores = new AtomicInteger(0);
public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) { public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
this.groupId = gId; this.groupId = gId;
@ -75,13 +70,6 @@ public class EntityCacheItem {
return store; return store;
} }
/**
* @return The number of currently active stores in all CacheItems.
*/
public static int getActiveStores() {
return activeStores.get();
}
/** /**
* Refresh this cache item if it needs refresh. This will enforce an appLogs * Refresh this cache item if it needs refresh. This will enforce an appLogs
* rescan and then load new data. The refresh process is synchronized with * rescan and then load new data. The refresh process is synchronized with
@ -107,7 +95,6 @@ public class EntityCacheItem {
} }
if (!appLogs.getDetailLogs().isEmpty()) { if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) { if (store == null) {
activeStores.getAndIncrement();
store = new LevelDBCacheTimelineStore(groupId.toString(), store = new LevelDBCacheTimelineStore(groupId.toString(),
"LeveldbCache." + groupId); "LeveldbCache." + groupId);
store.init(config); store.init(config);
@ -133,31 +120,6 @@ public class EntityCacheItem {
return store; return store;
} }
/**
* Increase the number of references to this cache item by 1.
*/
public synchronized void incrRefs() {
refCount++;
}
/**
* Unregister a reader. Try to release the cache if the reader to current
* cache reaches 0.
*
* @return true if the cache has been released, otherwise false
*/
public synchronized boolean tryRelease() {
refCount--;
// Only reclaim the storage if there is no reader.
if (refCount > 0) {
LOG.debug("{} references left for cached group {}, skipping the release",
refCount, groupId);
return false;
}
forceRelease();
return true;
}
/** /**
* Force releasing the cache item for the given group id, even though there * Force releasing the cache item for the given group id, even though there
* may be active references. * may be active references.
@ -171,8 +133,6 @@ public class EntityCacheItem {
LOG.warn("Error closing timeline store", e); LOG.warn("Error closing timeline store", e);
} }
store = null; store = null;
activeStores.getAndDecrement();
refCount = 0;
// reset offsets so next time logs are re-parsed // reset offsets so next time logs are re-parsed
for (LogInfo log : appLogs.getDetailLogs()) { for (LogInfo log : appLogs.getDetailLogs()) {
if (log.getFilename().contains(groupId.toString())) { if (log.getFilename().contains(groupId.toString())) {
@ -182,12 +142,6 @@ public class EntityCacheItem {
LOG.debug("Cache for group {} released. ", groupId); LOG.debug("Cache for group {} released. ", groupId);
} }
@InterfaceAudience.Private
@VisibleForTesting
synchronized int getRefCount() {
return refCount;
}
private boolean needRefresh() { private boolean needRefresh() {
return (Time.monotonicNow() - lastRefresh > 10000); return (Time.monotonicNow() - lastRefresh > 10000);
} }

View File

@ -181,15 +181,8 @@ public class EntityGroupFSTimelineStore extends CompositeService
TimelineEntityGroupId groupId = eldest.getKey(); TimelineEntityGroupId groupId = eldest.getKey();
LOG.debug("Evicting {} due to space limitations", groupId); LOG.debug("Evicting {} due to space limitations", groupId);
EntityCacheItem cacheItem = eldest.getValue(); EntityCacheItem cacheItem = eldest.getValue();
int activeStores = EntityCacheItem.getActiveStores(); LOG.debug("Force release cache {}.", groupId);
if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) {
LOG.debug("Force release cache {} since {} stores are active",
groupId, activeStores);
cacheItem.forceRelease(); cacheItem.forceRelease();
} else {
LOG.debug("Try release cache {}", groupId);
cacheItem.tryRelease();
}
if (cacheItem.getAppLogs().isDone()) { if (cacheItem.getAppLogs().isDone()) {
appIdLogMap.remove(groupId.getApplicationId()); appIdLogMap.remove(groupId.getApplicationId());
} }
@ -920,7 +913,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
cacheItem.incrRefs();
cachedLogs.put(groupId, cacheItem); cachedLogs.put(groupId, cacheItem);
} }
@ -1003,8 +995,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
LOG.debug("Set applogs {} for group id {}", appLogs, groupId); LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
cacheItem.setAppLogs(appLogs); cacheItem.setAppLogs(appLogs);
this.cachedLogs.put(groupId, cacheItem); this.cachedLogs.put(groupId, cacheItem);
// Add the reference by the cache
cacheItem.incrRefs();
} else { } else {
LOG.warn("AppLogs for groupId {} is set to null!", groupId); LOG.warn("AppLogs for groupId {} is set to null!", groupId);
} }
@ -1014,8 +1004,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
if (cacheItem.getAppLogs() != null) { if (cacheItem.getAppLogs() != null) {
AppLogs appLogs = cacheItem.getAppLogs(); AppLogs appLogs = cacheItem.getAppLogs();
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
// Add the reference by the store
cacheItem.incrRefs();
cacheItems.add(cacheItem); cacheItems.add(cacheItem);
store = cacheItem.refreshCache(aclManager, metrics); store = cacheItem.refreshCache(aclManager, metrics);
} else { } else {
@ -1024,12 +1012,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
return store; return store;
} }
protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) {
for (EntityCacheItem item : relatedCacheItems) {
item.tryRelease();
}
}
@Override @Override
public TimelineEntities getEntities(String entityType, Long limit, public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs, Long windowStart, Long windowEnd, String fromId, Long fromTs,
@ -1049,7 +1031,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
returnEntities.addEntities(entities.getEntities()); returnEntities.addEntities(entities.getEntities());
} }
} }
tryReleaseCacheItems(relatedCacheItems);
return returnEntities; return returnEntities;
} }
@ -1066,12 +1047,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
TimelineEntity e = TimelineEntity e =
store.getEntity(entityId, entityType, fieldsToRetrieve); store.getEntity(entityId, entityType, fieldsToRetrieve);
if (e != null) { if (e != null) {
tryReleaseCacheItems(relatedCacheItems);
return e; return e;
} }
} }
LOG.debug("getEntity: Found nothing"); LOG.debug("getEntity: Found nothing");
tryReleaseCacheItems(relatedCacheItems);
return null; return null;
} }
@ -1099,7 +1078,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
} }
} }
} }
tryReleaseCacheItems(relatedCacheItems);
return returnEvents; return returnEvents;
} }

View File

@ -57,10 +57,6 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -68,7 +64,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
@ -98,7 +93,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static Path testDoneDirPath; private static Path testDoneDirPath;
private static String mainEntityLogFileName; private static String mainEntityLogFileName;
private EntityGroupFSTimelineStoreForTest store; private EntityGroupFSTimelineStore store;
private TimelineEntity entityNew; private TimelineEntity entityNew;
@Rule @Rule
@ -150,7 +145,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
createTestFiles(appId, attemotDirPath); createTestFiles(appId, attemotDirPath);
} }
store = new EntityGroupFSTimelineStoreForTest(); store = new EntityGroupFSTimelineStore();
if (currTestName.getMethodName().contains("Plugin")) { if (currTestName.getMethodName().contains("Plugin")) {
rootDir = GenericTestUtils.getTestDir(getClass() rootDir = GenericTestUtils.getTestDir(getClass()
.getSimpleName()); .getSimpleName());
@ -375,8 +370,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
UserGroupInformation.getLoginUser()); UserGroupInformation.getLoginUser());
assertNotNull(entity3); assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime()); assertEquals(entityNew.getStartTime(), entity3.getStartTime());
assertEquals(1, cacheItem.getRefCount());
assertEquals(1, EntityCacheItem.getActiveStores());
// Verify multiple entities read // Verify multiple entities read
NameValuePair primaryFilter = new NameValuePair( NameValuePair primaryFilter = new NameValuePair(
EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString()); EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
@ -392,74 +385,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples()); assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
} }
@Test(timeout = 90000L)
public void testMultiplePluginRead() throws Exception {
Thread mainThread = Thread.currentThread();
mainThread.setName("testMain");
// Verify precondition
assertEquals(EntityGroupPlugInForTest.class.getName(),
store.getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
// Prepare timeline store by making cache items
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
final EntityCacheItem cacheItem = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
config);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
cacheItem);
// Launch the blocking read call in a future
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
FutureTask<TimelineEntity> blockingReader =
new FutureTask<>(new Callable<TimelineEntity>() {
public TimelineEntity call() throws Exception {
Thread currThread = Thread.currentThread();
currThread.setName("blockingReader");
return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
EnumSet.allOf(TimelineReader.Field.class));
}});
threadExecutor.execute(blockingReader);
try {
while (!store.testCacheReferenced) {
Thread.sleep(300);
}
} catch (InterruptedException e) {
fail("Interrupted on exception " + e);
}
// Try refill the cache after the first cache item is referenced
for (ApplicationId appId : sampleAppIds) {
// Skip the first appId since it's already in cache
if (appId.equals(mainTestAppId)) {
continue;
}
EntityGroupFSTimelineStore.AppLogs currAppLog =
store.new AppLogs(appId, getTestRootPath(appId.toString()),
AppState.COMPLETED);
EntityCacheItem item = new EntityCacheItem(
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
config);
item.setAppLogs(currAppLog);
store.setCachedLogs(
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
item);
}
// At this time, the cache item of the blocking reader should be evicted.
assertEquals(1, cacheItem.getRefCount());
store.testCanProceed = true;
TimelineEntity entity3 = blockingReader.get();
assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
assertEquals(0, cacheItem.getRefCount());
threadExecutor.shutdownNow();
}
@Test @Test
public void testSummaryRead() throws Exception { public void testSummaryRead() throws Exception {
// Load data // Load data
@ -518,38 +443,4 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static String getAttemptDirName(ApplicationId appId) { private static String getAttemptDirName(ApplicationId appId) {
return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1"; return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
} }
private static class EntityGroupFSTimelineStoreForTest
extends EntityGroupFSTimelineStore {
// Flags used for the concurrent testing environment
private volatile boolean testCanProceed = false;
private volatile boolean testCacheReferenced = false;
TimelineEntity getEntityBlocking(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) throws IOException {
List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
List<TimelineStore> stores = getTimelineStoresForRead(entityId,
entityType, relatedCacheItems);
testCacheReferenced = true;
try {
while (!testCanProceed) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
fail("Interrupted " + e);
}
for (TimelineStore store : stores) {
TimelineEntity e =
store.getEntity(entityId, entityType, fieldsToRetrieve);
if (e != null) {
tryReleaseCacheItems(relatedCacheItems);
return e;
}
}
tryReleaseCacheItems(relatedCacheItems);
return null;
}
}
} }