YARN-5432. Lock already held by another process while LevelDB cache store creation for dag. Contributed by Li Lu.
(cherry picked from commit7f3c306e2e
) (cherry picked from commit932bf79244
)
This commit is contained in:
parent
960caf624d
commit
08358c8d4e
|
@ -2073,6 +2073,17 @@
|
|||
<value>10485760</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.entity-group-fs-store.app-cache-size</name>
|
||||
<description>
|
||||
Size of the reader cache for ATS v1.5 reader. This value controls how many
|
||||
entity groups the ATS v1.5 server should cache. If the number of active
|
||||
read entity groups is greater than the number of caches items, some reads
|
||||
may return empty data. This value must be greater than 0.
|
||||
</description>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.timeline-service.client.fd-flush-interval-secs</name>
|
||||
<description>
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.timeline;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
|
@ -26,7 +24,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Cache item for timeline server v1.5 reader cache. Each cache item has a
|
||||
|
@ -41,8 +38,6 @@ public class EntityCacheItem {
|
|||
private EntityGroupFSTimelineStore.AppLogs appLogs;
|
||||
private long lastRefresh;
|
||||
private Configuration config;
|
||||
private int refCount = 0;
|
||||
private static AtomicInteger activeStores = new AtomicInteger(0);
|
||||
|
||||
public EntityCacheItem(TimelineEntityGroupId gId, Configuration config) {
|
||||
this.groupId = gId;
|
||||
|
@ -75,13 +70,6 @@ public class EntityCacheItem {
|
|||
return store;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The number of currently active stores in all CacheItems.
|
||||
*/
|
||||
public static int getActiveStores() {
|
||||
return activeStores.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh this cache item if it needs refresh. This will enforce an appLogs
|
||||
* rescan and then load new data. The refresh process is synchronized with
|
||||
|
@ -107,7 +95,6 @@ public class EntityCacheItem {
|
|||
}
|
||||
if (!appLogs.getDetailLogs().isEmpty()) {
|
||||
if (store == null) {
|
||||
activeStores.getAndIncrement();
|
||||
store = new LevelDBCacheTimelineStore(groupId.toString(),
|
||||
"LeveldbCache." + groupId);
|
||||
store.init(config);
|
||||
|
@ -133,31 +120,6 @@ public class EntityCacheItem {
|
|||
return store;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase the number of references to this cache item by 1.
|
||||
*/
|
||||
public synchronized void incrRefs() {
|
||||
refCount++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister a reader. Try to release the cache if the reader to current
|
||||
* cache reaches 0.
|
||||
*
|
||||
* @return true if the cache has been released, otherwise false
|
||||
*/
|
||||
public synchronized boolean tryRelease() {
|
||||
refCount--;
|
||||
// Only reclaim the storage if there is no reader.
|
||||
if (refCount > 0) {
|
||||
LOG.debug("{} references left for cached group {}, skipping the release",
|
||||
refCount, groupId);
|
||||
return false;
|
||||
}
|
||||
forceRelease();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Force releasing the cache item for the given group id, even though there
|
||||
* may be active references.
|
||||
|
@ -171,8 +133,6 @@ public class EntityCacheItem {
|
|||
LOG.warn("Error closing timeline store", e);
|
||||
}
|
||||
store = null;
|
||||
activeStores.getAndDecrement();
|
||||
refCount = 0;
|
||||
// reset offsets so next time logs are re-parsed
|
||||
for (LogInfo log : appLogs.getDetailLogs()) {
|
||||
if (log.getFilename().contains(groupId.toString())) {
|
||||
|
@ -182,12 +142,6 @@ public class EntityCacheItem {
|
|||
LOG.debug("Cache for group {} released. ", groupId);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
synchronized int getRefCount() {
|
||||
return refCount;
|
||||
}
|
||||
|
||||
private boolean needRefresh() {
|
||||
return (Time.monotonicNow() - lastRefresh > 10000);
|
||||
}
|
||||
|
|
|
@ -176,15 +176,8 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
TimelineEntityGroupId groupId = eldest.getKey();
|
||||
LOG.debug("Evicting {} due to space limitations", groupId);
|
||||
EntityCacheItem cacheItem = eldest.getValue();
|
||||
int activeStores = EntityCacheItem.getActiveStores();
|
||||
if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) {
|
||||
LOG.debug("Force release cache {} since {} stores are active",
|
||||
groupId, activeStores);
|
||||
cacheItem.forceRelease();
|
||||
} else {
|
||||
LOG.debug("Try release cache {}", groupId);
|
||||
cacheItem.tryRelease();
|
||||
}
|
||||
LOG.debug("Force release cache {}.", groupId);
|
||||
cacheItem.forceRelease();
|
||||
if (cacheItem.getAppLogs().isDone()) {
|
||||
appIdLogMap.remove(groupId.getApplicationId());
|
||||
}
|
||||
|
@ -862,7 +855,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
|
||||
cacheItem.incrRefs();
|
||||
cachedLogs.put(groupId, cacheItem);
|
||||
}
|
||||
|
||||
|
@ -945,8 +937,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
|
||||
cacheItem.setAppLogs(appLogs);
|
||||
this.cachedLogs.put(groupId, cacheItem);
|
||||
// Add the reference by the cache
|
||||
cacheItem.incrRefs();
|
||||
} else {
|
||||
LOG.warn("AppLogs for groupId {} is set to null!", groupId);
|
||||
}
|
||||
|
@ -956,8 +946,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
if (cacheItem.getAppLogs() != null) {
|
||||
AppLogs appLogs = cacheItem.getAppLogs();
|
||||
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
|
||||
// Add the reference by the store
|
||||
cacheItem.incrRefs();
|
||||
cacheItems.add(cacheItem);
|
||||
store = cacheItem.refreshCache(aclManager, metrics);
|
||||
} else {
|
||||
|
@ -966,12 +954,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
return store;
|
||||
}
|
||||
|
||||
protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) {
|
||||
for (EntityCacheItem item : relatedCacheItems) {
|
||||
item.tryRelease();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineEntities getEntities(String entityType, Long limit,
|
||||
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||
|
@ -991,7 +973,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
returnEntities.addEntities(entities.getEntities());
|
||||
}
|
||||
}
|
||||
tryReleaseCacheItems(relatedCacheItems);
|
||||
return returnEntities;
|
||||
}
|
||||
|
||||
|
@ -1008,12 +989,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
TimelineEntity e =
|
||||
store.getEntity(entityId, entityType, fieldsToRetrieve);
|
||||
if (e != null) {
|
||||
tryReleaseCacheItems(relatedCacheItems);
|
||||
return e;
|
||||
}
|
||||
}
|
||||
LOG.debug("getEntity: Found nothing");
|
||||
tryReleaseCacheItems(relatedCacheItems);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1041,7 +1020,6 @@ public class EntityGroupFSTimelineStore extends CompositeService
|
|||
}
|
||||
}
|
||||
}
|
||||
tryReleaseCacheItems(relatedCacheItems);
|
||||
return returnEvents;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,10 +51,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.FutureTask;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -62,7 +58,6 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
||||
|
||||
|
@ -92,7 +87,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
private static Path testDoneDirPath;
|
||||
private static String mainEntityLogFileName;
|
||||
|
||||
private EntityGroupFSTimelineStoreForTest store;
|
||||
private EntityGroupFSTimelineStore store;
|
||||
private TimelineEntity entityNew;
|
||||
|
||||
@Rule
|
||||
|
@ -142,7 +137,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
createTestFiles(appId, attemotDirPath);
|
||||
}
|
||||
|
||||
store = new EntityGroupFSTimelineStoreForTest();
|
||||
store = new EntityGroupFSTimelineStore();
|
||||
if (currTestName.getMethodName().contains("Plugin")) {
|
||||
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
|
||||
EntityGroupPlugInForTest.class.getName());
|
||||
|
@ -329,8 +324,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
UserGroupInformation.getLoginUser());
|
||||
assertNotNull(entity3);
|
||||
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
|
||||
assertEquals(1, cacheItem.getRefCount());
|
||||
assertEquals(1, EntityCacheItem.getActiveStores());
|
||||
// Verify multiple entities read
|
||||
NameValuePair primaryFilter = new NameValuePair(
|
||||
EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
|
||||
|
@ -346,74 +339,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
|
||||
}
|
||||
|
||||
@Test(timeout = 90000L)
|
||||
public void testMultiplePluginRead() throws Exception {
|
||||
Thread mainThread = Thread.currentThread();
|
||||
mainThread.setName("testMain");
|
||||
// Verify precondition
|
||||
assertEquals(EntityGroupPlugInForTest.class.getName(),
|
||||
store.getConfig().get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
|
||||
// Prepare timeline store by making cache items
|
||||
EntityGroupFSTimelineStore.AppLogs appLogs =
|
||||
store.new AppLogs(mainTestAppId, mainTestAppDirPath,
|
||||
AppState.COMPLETED);
|
||||
final EntityCacheItem cacheItem = new EntityCacheItem(
|
||||
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
|
||||
config);
|
||||
|
||||
cacheItem.setAppLogs(appLogs);
|
||||
store.setCachedLogs(
|
||||
EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
|
||||
cacheItem);
|
||||
|
||||
// Launch the blocking read call in a future
|
||||
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
|
||||
FutureTask<TimelineEntity> blockingReader =
|
||||
new FutureTask<>(new Callable<TimelineEntity>() {
|
||||
public TimelineEntity call() throws Exception {
|
||||
Thread currThread = Thread.currentThread();
|
||||
currThread.setName("blockingReader");
|
||||
return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
|
||||
EnumSet.allOf(TimelineReader.Field.class));
|
||||
}});
|
||||
threadExecutor.execute(blockingReader);
|
||||
try {
|
||||
while (!store.testCacheReferenced) {
|
||||
Thread.sleep(300);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
fail("Interrupted on exception " + e);
|
||||
}
|
||||
// Try refill the cache after the first cache item is referenced
|
||||
for (ApplicationId appId : sampleAppIds) {
|
||||
// Skip the first appId since it's already in cache
|
||||
if (appId.equals(mainTestAppId)) {
|
||||
continue;
|
||||
}
|
||||
EntityGroupFSTimelineStore.AppLogs currAppLog =
|
||||
store.new AppLogs(appId, getTestRootPath(appId.toString()),
|
||||
AppState.COMPLETED);
|
||||
EntityCacheItem item = new EntityCacheItem(
|
||||
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
|
||||
config);
|
||||
item.setAppLogs(currAppLog);
|
||||
store.setCachedLogs(
|
||||
EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
|
||||
item);
|
||||
}
|
||||
// At this time, the cache item of the blocking reader should be evicted.
|
||||
assertEquals(1, cacheItem.getRefCount());
|
||||
store.testCanProceed = true;
|
||||
TimelineEntity entity3 = blockingReader.get();
|
||||
|
||||
assertNotNull(entity3);
|
||||
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
|
||||
assertEquals(0, cacheItem.getRefCount());
|
||||
|
||||
threadExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSummaryRead() throws Exception {
|
||||
// Load data
|
||||
|
@ -472,38 +397,4 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|||
private static String getAttemptDirName(ApplicationId appId) {
|
||||
return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
|
||||
}
|
||||
|
||||
private static class EntityGroupFSTimelineStoreForTest
|
||||
extends EntityGroupFSTimelineStore {
|
||||
// Flags used for the concurrent testing environment
|
||||
private volatile boolean testCanProceed = false;
|
||||
private volatile boolean testCacheReferenced = false;
|
||||
|
||||
TimelineEntity getEntityBlocking(String entityId, String entityType,
|
||||
EnumSet<Field> fieldsToRetrieve) throws IOException {
|
||||
List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
|
||||
List<TimelineStore> stores = getTimelineStoresForRead(entityId,
|
||||
entityType, relatedCacheItems);
|
||||
|
||||
testCacheReferenced = true;
|
||||
try {
|
||||
while (!testCanProceed) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
fail("Interrupted " + e);
|
||||
}
|
||||
|
||||
for (TimelineStore store : stores) {
|
||||
TimelineEntity e =
|
||||
store.getEntity(entityId, entityType, fieldsToRetrieve);
|
||||
if (e != null) {
|
||||
tryReleaseCacheItems(relatedCacheItems);
|
||||
return e;
|
||||
}
|
||||
}
|
||||
tryReleaseCacheItems(relatedCacheItems);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue