YARN-2180. [YARN-1492] In-memory backing store for cache manager. (Chris Trezzo via kasha)
(cherry picked from commit 4f426fe223
)
This commit is contained in:
parent
8addbe2b8e
commit
7e20187f91
|
@ -9,6 +9,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2179. [YARN-1492] Initial cache manager structure and context.
|
YARN-2179. [YARN-1492] Initial cache manager structure and context.
|
||||||
(Chris Trezzo via kasha)
|
(Chris Trezzo via kasha)
|
||||||
|
|
||||||
|
YARN-2180. [YARN-1492] In-memory backing store for cache manager.
|
||||||
|
(Chris Trezzo via kasha)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
|
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
|
||||||
|
|
|
@ -1332,6 +1332,49 @@ public class YarnConfiguration extends Configuration {
|
||||||
SHARED_CACHE_PREFIX + "nested-level";
|
SHARED_CACHE_PREFIX + "nested-level";
|
||||||
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
|
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
|
||||||
|
|
||||||
|
// Shared Cache Manager Configs
|
||||||
|
|
||||||
|
public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store.";
|
||||||
|
|
||||||
|
public static final String SCM_STORE_CLASS = SCM_STORE_PREFIX + "class";
|
||||||
|
public static final String DEFAULT_SCM_STORE_CLASS =
|
||||||
|
"org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore";
|
||||||
|
|
||||||
|
public static final String SCM_APP_CHECKER_CLASS = SHARED_CACHE_PREFIX
|
||||||
|
+ "app-checker.class";
|
||||||
|
public static final String DEFAULT_SCM_APP_CHECKER_CLASS =
|
||||||
|
"org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker";
|
||||||
|
|
||||||
|
// In-memory SCM store configuration
|
||||||
|
|
||||||
|
public static final String IN_MEMORY_STORE_PREFIX =
|
||||||
|
SHARED_CACHE_PREFIX + "in-memory.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A resource in the InMemorySCMStore is considered stale if the time since
|
||||||
|
* the last reference exceeds the staleness period. This value is specified in
|
||||||
|
* minutes.
|
||||||
|
*/
|
||||||
|
public static final String IN_MEMORY_STALENESS_PERIOD =
|
||||||
|
IN_MEMORY_STORE_PREFIX + "staleness-period";
|
||||||
|
public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initial delay before the in-memory store runs its first check to remove
|
||||||
|
* dead initial applications. Specified in minutes.
|
||||||
|
*/
|
||||||
|
public static final String IN_MEMORY_INITIAL_DELAY =
|
||||||
|
IN_MEMORY_STORE_PREFIX + "initial-delay";
|
||||||
|
public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The frequency at which the in-memory store checks to remove dead initial
|
||||||
|
* applications. Specified in minutes.
|
||||||
|
*/
|
||||||
|
public static final String IN_MEMORY_CHECK_PERIOD =
|
||||||
|
IN_MEMORY_STORE_PREFIX + "check-period";
|
||||||
|
public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// Other Configs
|
// Other Configs
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
|
|
|
@ -1341,6 +1341,40 @@
|
||||||
<value>3</value>
|
<value>3</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The implementation to be used for the SCM store</description>
|
||||||
|
<name>yarn.sharedcache.store.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The implementation to be used for the SCM app-checker</description>
|
||||||
|
<name>yarn.sharedcache.app-checker.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>A resource in the in-memory store is considered stale
|
||||||
|
if the time since the last reference exceeds the staleness period.
|
||||||
|
This value is specified in minutes.</description>
|
||||||
|
<name>yarn.sharedcache.store.in-memory.staleness-period</name>
|
||||||
|
<value>10080</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Initial delay before the in-memory store runs its first check
|
||||||
|
to remove dead initial applications. Specified in minutes.</description>
|
||||||
|
<name>yarn.sharedcache.store.in-memory.initial-delay</name>
|
||||||
|
<value>10</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The frequency at which the in-memory store checks to remove
|
||||||
|
dead initial applications. Specified in minutes.</description>
|
||||||
|
<name>yarn.sharedcache.store.in-memory.check-period</name>
|
||||||
|
<value>720</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!-- Other configuration -->
|
<!-- Other configuration -->
|
||||||
<property>
|
<property>
|
||||||
<description>The interval that the yarn client library uses to poll the
|
<description>The interval that the yarn client library uses to poll the
|
||||||
|
|
|
@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class SharedCacheStructureUtil {
|
public class SharedCacheUtil {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class);
|
private static final Log LOG = LogFactory.getLog(SharedCacheUtil.class);
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static int getCacheDepth(Configuration conf) {
|
public static int getCacheDepth(Configuration conf) {
|
|
@ -42,6 +42,10 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-common</artifactId>
|
<artifactId>hadoop-yarn-common</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-client</artifactId>
|
<artifactId>hadoop-yarn-client</artifactId>
|
||||||
|
|
|
@ -26,10 +26,15 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This service maintains the shared cache meta data. It handles claiming and
|
* This service maintains the shared cache meta data. It handles claiming and
|
||||||
|
@ -47,12 +52,18 @@ public class SharedCacheManager extends CompositeService {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
|
private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
|
||||||
|
|
||||||
|
private SCMStore store;
|
||||||
|
|
||||||
public SharedCacheManager() {
|
public SharedCacheManager() {
|
||||||
super("SharedCacheManager");
|
super("SharedCacheManager");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
|
||||||
|
this.store = createSCMStoreService(conf);
|
||||||
|
addService(store);
|
||||||
|
|
||||||
// init metrics
|
// init metrics
|
||||||
DefaultMetricsSystem.initialize("SharedCacheManager");
|
DefaultMetricsSystem.initialize("SharedCacheManager");
|
||||||
JvmMetrics.initSingleton("SharedCacheManager", null);
|
JvmMetrics.initSingleton("SharedCacheManager", null);
|
||||||
|
@ -60,6 +71,25 @@ public class SharedCacheManager extends CompositeService {
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static SCMStore createSCMStoreService(Configuration conf) {
|
||||||
|
Class<? extends SCMStore> defaultStoreClass;
|
||||||
|
try {
|
||||||
|
defaultStoreClass =
|
||||||
|
(Class<? extends SCMStore>) Class
|
||||||
|
.forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new YarnRuntimeException("Invalid default scm store class"
|
||||||
|
+ YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCMStore store =
|
||||||
|
ReflectionUtils.newInstance(conf.getClass(
|
||||||
|
YarnConfiguration.SCM_STORE_CLASS,
|
||||||
|
defaultStoreClass, SCMStore.class), conf);
|
||||||
|
return store;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
|
|
||||||
|
@ -67,6 +97,14 @@ public class SharedCacheManager extends CompositeService {
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For testing purposes only.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
SCMStore getSCMStore() {
|
||||||
|
return this.store;
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||||
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
|
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
|
||||||
|
@ -83,4 +121,24 @@ public class SharedCacheManager extends CompositeService {
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static AppChecker createAppCheckerService(Configuration conf) {
|
||||||
|
Class<? extends AppChecker> defaultCheckerClass;
|
||||||
|
try {
|
||||||
|
defaultCheckerClass =
|
||||||
|
(Class<? extends AppChecker>) Class
|
||||||
|
.forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new YarnRuntimeException("Invalid default scm app checker class"
|
||||||
|
+ YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
AppChecker checker =
|
||||||
|
ReflectionUtils.newInstance(conf.getClass(
|
||||||
|
YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
|
||||||
|
AppChecker.class), conf);
|
||||||
|
return checker;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,514 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.util.StringInterner;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
|
||||||
|
import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A thread safe version of an in-memory SCM store. The thread safety is
|
||||||
|
* implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap
|
||||||
|
* is used to allow concurrency to resources and their associated references,
|
||||||
|
* and (2) a key level lock is used to ensure mutual exclusion between any
|
||||||
|
* operation that accesses a resource with the same key. <br>
|
||||||
|
* <br>
|
||||||
|
* To ensure safe key-level locking, we use the original string key and intern
|
||||||
|
* it weakly using hadoop's <code>StringInterner</code>. It avoids the pitfalls
|
||||||
|
* of using built-in String interning. The interned strings are also weakly
|
||||||
|
* referenced, so it can be garbage collected once it is done. And there is
|
||||||
|
* little risk of keys being available for other parts of the code so they can
|
||||||
|
* be used as locks accidentally. <br>
|
||||||
|
* <br>
|
||||||
|
* Resources in the in-memory store are evicted based on a time staleness
|
||||||
|
* criteria. If a resource is not referenced (i.e. used) for a given period, it
|
||||||
|
* is designated as a stale resource and is considered evictable.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
public class InMemorySCMStore extends SCMStore {
|
||||||
|
private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
|
||||||
|
|
||||||
|
private final Map<String, SharedCacheResource> cachedResources =
|
||||||
|
new ConcurrentHashMap<String, SharedCacheResource>();
|
||||||
|
private Collection<ApplicationId> initialApps =
|
||||||
|
new ArrayList<ApplicationId>();
|
||||||
|
private final Object initialAppsLock = new Object();
|
||||||
|
private long startTime;
|
||||||
|
private int stalenessMinutes;
|
||||||
|
private AppChecker appChecker;
|
||||||
|
private ScheduledExecutorService scheduler;
|
||||||
|
private int initialDelayMin;
|
||||||
|
private int checkPeriodMin;
|
||||||
|
|
||||||
|
public InMemorySCMStore() {
|
||||||
|
super(InMemorySCMStore.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String intern(String key) {
|
||||||
|
return StringInterner.weakIntern(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The in-memory store bootstraps itself from the shared cache entries that
|
||||||
|
* exist in HDFS.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
|
||||||
|
this.startTime = System.currentTimeMillis();
|
||||||
|
this.initialDelayMin = getInitialDelay(conf);
|
||||||
|
this.checkPeriodMin = getCheckPeriod(conf);
|
||||||
|
this.stalenessMinutes = getStalenessPeriod(conf);
|
||||||
|
|
||||||
|
appChecker = createAppCheckerService(conf);
|
||||||
|
addService(appChecker);
|
||||||
|
|
||||||
|
bootstrap(conf);
|
||||||
|
|
||||||
|
ThreadFactory tf =
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
|
||||||
|
.build();
|
||||||
|
scheduler = Executors.newSingleThreadScheduledExecutor(tf);
|
||||||
|
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
// start composed services first
|
||||||
|
super.serviceStart();
|
||||||
|
|
||||||
|
// Get initial list of running applications
|
||||||
|
LOG.info("Getting the active app list to initialize the in-memory scm store");
|
||||||
|
synchronized (initialAppsLock) {
|
||||||
|
initialApps = appChecker.getActiveApplications();
|
||||||
|
}
|
||||||
|
LOG.info(initialApps.size() + " apps recorded as active at this time");
|
||||||
|
|
||||||
|
Runnable task = new AppCheckTask(appChecker);
|
||||||
|
scheduler.scheduleAtFixedRate(task, initialDelayMin, checkPeriodMin,
|
||||||
|
TimeUnit.MINUTES);
|
||||||
|
LOG.info("Scheduled the in-memory scm store app check task to run every "
|
||||||
|
+ checkPeriodMin + " minutes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
LOG.info("Shutting down the background thread.");
|
||||||
|
scheduler.shutdownNow();
|
||||||
|
try {
|
||||||
|
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||||
|
LOG.warn("Gave up waiting for the app check task to shutdown.");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("The InMemorySCMStore was interrupted while shutting down the "
|
||||||
|
+ "app check task.", e);
|
||||||
|
}
|
||||||
|
LOG.info("The background thread stopped.");
|
||||||
|
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
AppChecker createAppCheckerService(Configuration conf) {
|
||||||
|
return SharedCacheManager.createAppCheckerService(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void bootstrap(Configuration conf) throws IOException {
|
||||||
|
Map<String, String> initialCachedResources =
|
||||||
|
getInitialCachedResources(FileSystem.get(conf), conf);
|
||||||
|
LOG.info("Bootstrapping from " + initialCachedResources.size()
|
||||||
|
+ " cache resources located in the file system");
|
||||||
|
Iterator<Map.Entry<String, String>> it =
|
||||||
|
initialCachedResources.entrySet().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Map.Entry<String, String> e = it.next();
|
||||||
|
String key = intern(e.getKey());
|
||||||
|
String fileName = e.getValue();
|
||||||
|
SharedCacheResource resource = new SharedCacheResource(fileName);
|
||||||
|
// we don't hold the lock for this as it is done as part of serviceInit
|
||||||
|
cachedResources.put(key, resource);
|
||||||
|
// clear out the initial resource to reduce the footprint
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
LOG.info("Bootstrapping complete");
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
Map<String, String> getInitialCachedResources(FileSystem fs,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
// get the root directory for the shared cache
|
||||||
|
String location =
|
||||||
|
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
|
||||||
|
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
|
||||||
|
Path root = new Path(location);
|
||||||
|
if (!fs.exists(root)) {
|
||||||
|
String message =
|
||||||
|
"The shared cache root directory " + location + " was not found";
|
||||||
|
LOG.error(message);
|
||||||
|
throw new IOException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
|
||||||
|
// now traverse individual directories and process them
|
||||||
|
// the directory structure is specified by the nested level parameter
|
||||||
|
// (e.g. 9/c/d/<checksum>/file)
|
||||||
|
StringBuilder pattern = new StringBuilder();
|
||||||
|
for (int i = 0; i < nestedLevel + 1; i++) {
|
||||||
|
pattern.append("*/");
|
||||||
|
}
|
||||||
|
pattern.append("*");
|
||||||
|
|
||||||
|
LOG.info("Querying for all individual cached resource files");
|
||||||
|
FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
|
||||||
|
int numEntries = entries == null ? 0 : entries.length;
|
||||||
|
LOG.info("Found " + numEntries + " files: processing for one resource per "
|
||||||
|
+ "key");
|
||||||
|
|
||||||
|
Map<String, String> initialCachedEntries = new HashMap<String, String>();
|
||||||
|
if (entries != null) {
|
||||||
|
for (FileStatus entry : entries) {
|
||||||
|
Path file = entry.getPath();
|
||||||
|
String fileName = file.getName();
|
||||||
|
if (entry.isFile()) {
|
||||||
|
// get the parent to get the checksum
|
||||||
|
Path parent = file.getParent();
|
||||||
|
if (parent != null) {
|
||||||
|
// the name of the immediate parent directory is the checksum
|
||||||
|
String key = parent.getName();
|
||||||
|
// make sure we insert only one file per checksum whichever comes
|
||||||
|
// first
|
||||||
|
if (initialCachedEntries.containsKey(key)) {
|
||||||
|
LOG.warn("Key " + key + " is already mapped to file "
|
||||||
|
+ initialCachedEntries.get(key) + "; file " + fileName
|
||||||
|
+ " will not be added");
|
||||||
|
} else {
|
||||||
|
initialCachedEntries.put(key, fileName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("A total of " + initialCachedEntries.size()
|
||||||
|
+ " files are now mapped");
|
||||||
|
return initialCachedEntries;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the given resource to the store under the key and the filename. If the
|
||||||
|
* entry is already found, it returns the existing filename. It represents the
|
||||||
|
* state of the store at the time of this query. The entry may change or even
|
||||||
|
* be removed once this method returns. The caller should be prepared to
|
||||||
|
* handle that situation.
|
||||||
|
*
|
||||||
|
* @return the filename of the newly inserted resource or that of the existing
|
||||||
|
* resource
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String addResource(String key, String fileName) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
if (resource == null) {
|
||||||
|
resource = new SharedCacheResource(fileName);
|
||||||
|
cachedResources.put(interned, resource);
|
||||||
|
}
|
||||||
|
return resource.getFileName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the provided resource reference to the cache resource under the key,
|
||||||
|
* and updates the access time. If it returns a non-null value, the caller may
|
||||||
|
* safely assume that the resource will not be removed at least until the app
|
||||||
|
* in this resource reference has terminated.
|
||||||
|
*
|
||||||
|
* @return the filename of the resource, or null if the resource is not found
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String addResourceReference(String key,
|
||||||
|
SharedCacheResourceReference ref) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
if (resource == null) { // it's not mapped
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
resource.addReference(ref);
|
||||||
|
resource.updateAccessTime();
|
||||||
|
return resource.getFileName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of resource references currently registered under the
|
||||||
|
* cache entry. If the list is empty, it returns an empty collection. The
|
||||||
|
* returned collection is unmodifiable and a snapshot of the information at
|
||||||
|
* the time of the query. The state may change after this query returns. The
|
||||||
|
* caller should handle the situation that some or all of these resource
|
||||||
|
* references are no longer relevant.
|
||||||
|
*
|
||||||
|
* @return the collection that contains the resource references associated
|
||||||
|
* with the resource; or an empty collection if no resource references
|
||||||
|
* are registered under this resource
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Collection<SharedCacheResourceReference> getResourceReferences(String key) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
if (resource == null) {
|
||||||
|
return Collections.emptySet();
|
||||||
|
}
|
||||||
|
Set<SharedCacheResourceReference> refs =
|
||||||
|
new HashSet<SharedCacheResourceReference>(
|
||||||
|
resource.getResourceReferences());
|
||||||
|
return Collections.unmodifiableSet(refs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the provided resource reference from the resource. If the resource
|
||||||
|
* does not exist, nothing will be done.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean removeResourceReference(String key, SharedCacheResourceReference ref,
|
||||||
|
boolean updateAccessTime) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
boolean removed = false;
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
if (resource != null) {
|
||||||
|
Set<SharedCacheResourceReference> resourceRefs =
|
||||||
|
resource.getResourceReferences();
|
||||||
|
removed = resourceRefs.remove(ref);
|
||||||
|
if (updateAccessTime) {
|
||||||
|
resource.updateAccessTime();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return removed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the provided collection of resource references from the resource.
|
||||||
|
* If the resource does not exist, nothing will be done.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeResourceReferences(String key,
|
||||||
|
Collection<SharedCacheResourceReference> refs, boolean updateAccessTime) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
if (resource != null) {
|
||||||
|
Set<SharedCacheResourceReference> resourceRefs =
|
||||||
|
resource.getResourceReferences();
|
||||||
|
resourceRefs.removeAll(refs);
|
||||||
|
if (updateAccessTime) {
|
||||||
|
resource.updateAccessTime();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the given resource from the store. Returns true if the resource is
|
||||||
|
* found and removed or if the resource is not found. Returns false if it was
|
||||||
|
* unable to remove the resource because the resource reference list was not
|
||||||
|
* empty.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean removeResource(String key) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
if (resource == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!resource.getResourceReferences().isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// no users
|
||||||
|
cachedResources.remove(interned);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtains the access time for a resource. It represents the view of the
|
||||||
|
* resource at the time of the query. The value may have been updated at a
|
||||||
|
* later point.
|
||||||
|
*
|
||||||
|
* @return the access time of the resource if found; -1 if the resource is not
|
||||||
|
* found
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
long getAccessTime(String key) {
|
||||||
|
String interned = intern(key);
|
||||||
|
synchronized (interned) {
|
||||||
|
SharedCacheResource resource = cachedResources.get(interned);
|
||||||
|
return resource == null ? -1 : resource.getAccessTime();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isResourceEvictable(String key, FileStatus file) {
|
||||||
|
synchronized (initialAppsLock) {
|
||||||
|
if (initialApps.size() > 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long staleTime =
|
||||||
|
System.currentTimeMillis()
|
||||||
|
- TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
|
||||||
|
long accessTime = getAccessTime(key);
|
||||||
|
if (accessTime == -1) {
|
||||||
|
// check modification time
|
||||||
|
long modTime = file.getModificationTime();
|
||||||
|
// if modification time is older then the store startup time, we need to
|
||||||
|
// just use the store startup time as the last point of certainty
|
||||||
|
long lastUse = modTime < this.startTime ? this.startTime : modTime;
|
||||||
|
return lastUse < staleTime;
|
||||||
|
} else {
|
||||||
|
// check access time
|
||||||
|
return accessTime < staleTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getStalenessPeriod(Configuration conf) {
|
||||||
|
int stalenessMinutes =
|
||||||
|
conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
|
||||||
|
YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
|
||||||
|
// non-positive value is invalid; use the default
|
||||||
|
if (stalenessMinutes <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException("Non-positive staleness value: "
|
||||||
|
+ stalenessMinutes
|
||||||
|
+ ". The staleness value must be greater than zero.");
|
||||||
|
}
|
||||||
|
return stalenessMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getInitialDelay(Configuration conf) {
|
||||||
|
int initialMinutes =
|
||||||
|
conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
|
||||||
|
YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
|
||||||
|
// non-positive value is invalid; use the default
|
||||||
|
if (initialMinutes <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Non-positive initial delay value: " + initialMinutes
|
||||||
|
+ ". The initial delay value must be greater than zero.");
|
||||||
|
}
|
||||||
|
return initialMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int getCheckPeriod(Configuration conf) {
|
||||||
|
int checkMinutes =
|
||||||
|
conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
|
||||||
|
YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
|
||||||
|
// non-positive value is invalid; use the default
|
||||||
|
if (checkMinutes <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Non-positive check period value: " + checkMinutes
|
||||||
|
+ ". The check period value must be greater than zero.");
|
||||||
|
}
|
||||||
|
return checkMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
class AppCheckTask implements Runnable {
|
||||||
|
|
||||||
|
private final AppChecker taskAppChecker;
|
||||||
|
|
||||||
|
public AppCheckTask(AppChecker appChecker) {
|
||||||
|
this.taskAppChecker = appChecker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
LOG.info("Checking the initial app list for finished applications.");
|
||||||
|
synchronized (initialAppsLock) {
|
||||||
|
if (initialApps.isEmpty()) {
|
||||||
|
// we're fine, no-op; there are no active apps that were running at
|
||||||
|
// the time of the service start
|
||||||
|
} else {
|
||||||
|
LOG.info("Looking into " + initialApps.size()
|
||||||
|
+ " apps to see if they are still active");
|
||||||
|
Iterator<ApplicationId> it = initialApps.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
ApplicationId id = it.next();
|
||||||
|
try {
|
||||||
|
if (!taskAppChecker.isApplicationActive(id)) {
|
||||||
|
// remove it from the list
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn("Exception while checking the app status;"
|
||||||
|
+ " will leave the entry in the list", e);
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("There are now " + initialApps.size()
|
||||||
|
+ " entries in the list");
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error(
|
||||||
|
"Unexpected exception thrown during in-memory store app check task."
|
||||||
|
+ " Rescheduling task.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,133 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.service.CompositeService;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An abstract class for the data store used by the shared cache manager
|
||||||
|
* service. All implementations of methods in this interface need to be thread
|
||||||
|
* safe and atomic.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
public abstract class SCMStore extends CompositeService {
|
||||||
|
|
||||||
|
protected SCMStore(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a resource to the shared cache and it's associated filename. The
|
||||||
|
* resource is identified by a unique key. If the key already exists no action
|
||||||
|
* is taken and the filename of the existing resource is returned. If the key
|
||||||
|
* does not exist, the resource is added, it's access time is set, and the
|
||||||
|
* filename of the resource is returned.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @param fileName the filename of the resource
|
||||||
|
* @return the filename of the resource as represented by the cache
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract String addResource(String key, String fileName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a resource from the shared cache.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @return true if the resource was removed or did not exist, false if the
|
||||||
|
* resource existed, contained at least one
|
||||||
|
* <code>SharedCacheResourceReference</code> and was not removed.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract boolean removeResource(String key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a <code>SharedCacheResourceReference</code> to a resource and update
|
||||||
|
* the resource access time.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @param ref the <code>SharedCacheResourceReference</code> to add
|
||||||
|
* @return String the filename of the resource if the
|
||||||
|
* <code>SharedCacheResourceReference</code> was added or already
|
||||||
|
* existed. null if the resource did not exist
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract String addResourceReference(String key,
|
||||||
|
SharedCacheResourceReference ref);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the <code>SharedCacheResourceReference</code>(s) associated with the
|
||||||
|
* resource.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @return an unmodifiable collection of
|
||||||
|
* <code>SharedCacheResourceReferences</code>. If the resource does
|
||||||
|
* not exist, an empty set is returned.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract Collection<SharedCacheResourceReference> getResourceReferences(
|
||||||
|
String key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a <code>SharedCacheResourceReference</code> from a resource.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @param ref the <code>SharedCacheResourceReference</code> to remove
|
||||||
|
* @param updateAccessTime true if the call should update the access time for
|
||||||
|
* the resource
|
||||||
|
* @return true if the reference was removed, false otherwise
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract boolean removeResourceReference(String key,
|
||||||
|
SharedCacheResourceReference ref, boolean updateAccessTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a collection of <code>SharedCacheResourceReferences</code> from a
|
||||||
|
* resource.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @param refs the collection of <code>SharedCacheResourceReference</code>s to
|
||||||
|
* remove
|
||||||
|
* @param updateAccessTime true if the call should update the access time for
|
||||||
|
* the resource
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract void removeResourceReferences(String key,
|
||||||
|
Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a specific resource is evictable according to the store's enabled
|
||||||
|
* cache eviction policies.
|
||||||
|
*
|
||||||
|
* @param key a unique identifier for a resource
|
||||||
|
* @param file the <code>FileStatus</code> object for the resource file in the
|
||||||
|
* file system.
|
||||||
|
* @return true if the resource is evicatble, false otherwise
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
public abstract boolean isResourceEvictable(String key, FileStatus file);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,64 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that encapsulates the cache resource. The instances are not thread
|
||||||
|
* safe. Any operation that uses the resource must use thread-safe mechanisms to
|
||||||
|
* ensure safe access with the only exception of the filename.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
class SharedCacheResource {
|
||||||
|
private long accessTime;
|
||||||
|
private final Set<SharedCacheResourceReference> refs;
|
||||||
|
private final String fileName;
|
||||||
|
|
||||||
|
SharedCacheResource(String fileName) {
|
||||||
|
this.accessTime = System.currentTimeMillis();
|
||||||
|
this.refs = new HashSet<SharedCacheResourceReference>();
|
||||||
|
this.fileName = fileName;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getAccessTime() {
|
||||||
|
return accessTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateAccessTime() {
|
||||||
|
accessTime = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
String getFileName() {
|
||||||
|
return this.fileName;
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<SharedCacheResourceReference> getResourceReferences() {
|
||||||
|
return this.refs;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean addReference(SharedCacheResourceReference ref) {
|
||||||
|
return this.refs.add(ref);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is an object that represents a reference to a shared cache resource.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Evolving
|
||||||
|
public class SharedCacheResourceReference {
|
||||||
|
private final ApplicationId appId;
|
||||||
|
private final String shortUserName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a resource reference.
|
||||||
|
*
|
||||||
|
* @param appId <code>ApplicationId</code> that is referencing a resource.
|
||||||
|
* @param shortUserName <code>ShortUserName</code> of the user that created
|
||||||
|
* the reference.
|
||||||
|
*/
|
||||||
|
public SharedCacheResourceReference(ApplicationId appId, String shortUserName) {
|
||||||
|
this.appId = appId;
|
||||||
|
this.shortUserName = shortUserName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationId getAppId() {
|
||||||
|
return this.appId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getShortUserName() {
|
||||||
|
return this.shortUserName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = 1;
|
||||||
|
result = prime * result + ((appId == null) ? 0 : appId.hashCode());
|
||||||
|
result =
|
||||||
|
prime * result
|
||||||
|
+ ((shortUserName == null) ? 0 : shortUserName.hashCode());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj)
|
||||||
|
return true;
|
||||||
|
if (obj == null)
|
||||||
|
return false;
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
SharedCacheResourceReference other = (SharedCacheResourceReference) obj;
|
||||||
|
if (appId == null) {
|
||||||
|
if (other.appId != null)
|
||||||
|
return false;
|
||||||
|
} else if (!appId.equals(other.appId))
|
||||||
|
return false;
|
||||||
|
if (shortUserName == null) {
|
||||||
|
if (other.shortUserName != null)
|
||||||
|
return false;
|
||||||
|
} else if (!shortUserName.equals(other.shortUserName))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,334 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestInMemorySCMStore {
|
||||||
|
|
||||||
|
private InMemorySCMStore store;
|
||||||
|
private AppChecker checker;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
this.store = spy(new InMemorySCMStore());
|
||||||
|
this.checker = spy(new DummyAppChecker());
|
||||||
|
doReturn(checker).when(store).createAppCheckerService(
|
||||||
|
isA(Configuration.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() {
|
||||||
|
if (this.store != null) {
|
||||||
|
this.store.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startEmptyStore() throws Exception {
|
||||||
|
doReturn(new ArrayList<ApplicationId>()).when(checker)
|
||||||
|
.getActiveApplications();
|
||||||
|
doReturn(new HashMap<String, String>()).when(store)
|
||||||
|
.getInitialCachedResources(isA(FileSystem.class),
|
||||||
|
isA(Configuration.class));
|
||||||
|
this.store.init(new Configuration());
|
||||||
|
this.store.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> startStoreWithResources() throws Exception {
|
||||||
|
Map<String, String> initialCachedResources = new HashMap<String, String>();
|
||||||
|
int count = 10;
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
String key = String.valueOf(i);
|
||||||
|
String fileName = key + ".jar";
|
||||||
|
initialCachedResources.put(key, fileName);
|
||||||
|
}
|
||||||
|
doReturn(new ArrayList<ApplicationId>()).when(checker)
|
||||||
|
.getActiveApplications();
|
||||||
|
doReturn(initialCachedResources).when(store).getInitialCachedResources(
|
||||||
|
isA(FileSystem.class), isA(Configuration.class));
|
||||||
|
this.store.init(new Configuration());
|
||||||
|
this.store.start();
|
||||||
|
return initialCachedResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startStoreWithApps() throws Exception {
|
||||||
|
ArrayList<ApplicationId> list = new ArrayList<ApplicationId>();
|
||||||
|
int count = 5;
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
list.add(createAppId(i, i));
|
||||||
|
}
|
||||||
|
doReturn(list).when(checker).getActiveApplications();
|
||||||
|
doReturn(new HashMap<String, String>()).when(store)
|
||||||
|
.getInitialCachedResources(isA(FileSystem.class),
|
||||||
|
isA(Configuration.class));
|
||||||
|
this.store.init(new Configuration());
|
||||||
|
this.store.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddResourceConcurrency() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
final String key = "key1";
|
||||||
|
int count = 5;
|
||||||
|
ExecutorService exec = Executors.newFixedThreadPool(count);
|
||||||
|
List<Future<String>> futures = new ArrayList<Future<String>>(count);
|
||||||
|
final CountDownLatch start = new CountDownLatch(1);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
final String fileName = "foo-" + i + ".jar";
|
||||||
|
Callable<String> task = new Callable<String>() {
|
||||||
|
public String call() throws Exception {
|
||||||
|
start.await();
|
||||||
|
String result = store.addResource(key, fileName);
|
||||||
|
System.out.println("fileName: " + fileName + ", result: " + result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
futures.add(exec.submit(task));
|
||||||
|
}
|
||||||
|
// start them all at the same time
|
||||||
|
start.countDown();
|
||||||
|
// check the result; they should all agree with the value
|
||||||
|
Set<String> results = new HashSet<String>();
|
||||||
|
for (Future<String> future: futures) {
|
||||||
|
results.add(future.get());
|
||||||
|
}
|
||||||
|
assertSame(1, results.size());
|
||||||
|
exec.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddResourceRefNonExistentResource() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
String key = "key1";
|
||||||
|
ApplicationId id = createAppId(1, 1L);
|
||||||
|
// try adding an app id without adding the key first
|
||||||
|
assertNull(store.addResourceReference(key,
|
||||||
|
new SharedCacheResourceReference(id, "user")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveResourceEmptyRefs() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
String key = "key1";
|
||||||
|
String fileName = "foo.jar";
|
||||||
|
// first add resource
|
||||||
|
store.addResource(key, fileName);
|
||||||
|
// try removing the resource; it should return true
|
||||||
|
assertTrue(store.removeResource(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddResourceRefRemoveResource() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
String key = "key1";
|
||||||
|
ApplicationId id = createAppId(1, 1L);
|
||||||
|
String user = "user";
|
||||||
|
// add the resource, and then add a resource ref
|
||||||
|
store.addResource(key, "foo.jar");
|
||||||
|
store.addResourceReference(key, new SharedCacheResourceReference(id, user));
|
||||||
|
// removeResource should return false
|
||||||
|
assertTrue(!store.removeResource(key));
|
||||||
|
// the resource and the ref should be intact
|
||||||
|
Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
|
||||||
|
assertTrue(refs != null);
|
||||||
|
assertEquals(Collections.singleton(new SharedCacheResourceReference(id, user)), refs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddResourceRefConcurrency() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
final String key = "key1";
|
||||||
|
final String user = "user";
|
||||||
|
String fileName = "foo.jar";
|
||||||
|
|
||||||
|
// first add the resource
|
||||||
|
store.addResource(key, fileName);
|
||||||
|
|
||||||
|
// make concurrent addResourceRef calls (clients)
|
||||||
|
int count = 5;
|
||||||
|
ExecutorService exec = Executors.newFixedThreadPool(count);
|
||||||
|
List<Future<String>> futures = new ArrayList<Future<String>>(count);
|
||||||
|
final CountDownLatch start = new CountDownLatch(1);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
final ApplicationId id = createAppId(i, i);
|
||||||
|
Callable<String> task = new Callable<String>() {
|
||||||
|
public String call() throws Exception {
|
||||||
|
start.await();
|
||||||
|
return store.addResourceReference(key,
|
||||||
|
new SharedCacheResourceReference(id, user));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
futures.add(exec.submit(task));
|
||||||
|
}
|
||||||
|
// start them all at the same time
|
||||||
|
start.countDown();
|
||||||
|
// check the result
|
||||||
|
Set<String> results = new HashSet<String>();
|
||||||
|
for (Future<String> future: futures) {
|
||||||
|
results.add(future.get());
|
||||||
|
}
|
||||||
|
// they should all have the same file name
|
||||||
|
assertSame(1, results.size());
|
||||||
|
assertEquals(Collections.singleton(fileName), results);
|
||||||
|
// there should be 5 refs as a result
|
||||||
|
Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
|
||||||
|
assertSame(count, refs.size());
|
||||||
|
exec.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddResourceRefAddResourceConcurrency() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
final String key = "key1";
|
||||||
|
final String fileName = "foo.jar";
|
||||||
|
final String user = "user";
|
||||||
|
final ApplicationId id = createAppId(1, 1L);
|
||||||
|
// add the resource and add the resource ref at the same time
|
||||||
|
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||||
|
final CountDownLatch start = new CountDownLatch(1);
|
||||||
|
Callable<String> addKeyTask = new Callable<String>() {
|
||||||
|
public String call() throws Exception {
|
||||||
|
start.await();
|
||||||
|
return store.addResource(key, fileName);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Callable<String> addAppIdTask = new Callable<String>() {
|
||||||
|
public String call() throws Exception {
|
||||||
|
start.await();
|
||||||
|
return store.addResourceReference(key,
|
||||||
|
new SharedCacheResourceReference(id, user));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Future<String> addAppIdFuture = exec.submit(addAppIdTask);
|
||||||
|
Future<String> addKeyFuture = exec.submit(addKeyTask);
|
||||||
|
// start them at the same time
|
||||||
|
start.countDown();
|
||||||
|
// get the results
|
||||||
|
String addKeyResult = addKeyFuture.get();
|
||||||
|
String addAppIdResult = addAppIdFuture.get();
|
||||||
|
assertEquals(fileName, addKeyResult);
|
||||||
|
System.out.println("addAppId() result: " + addAppIdResult);
|
||||||
|
// it may be null or the fileName depending on the timing
|
||||||
|
assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName));
|
||||||
|
exec.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveRef() throws Exception {
|
||||||
|
startEmptyStore();
|
||||||
|
String key = "key1";
|
||||||
|
String fileName = "foo.jar";
|
||||||
|
String user = "user";
|
||||||
|
// first add the resource
|
||||||
|
store.addResource(key, fileName);
|
||||||
|
// add a ref
|
||||||
|
ApplicationId id = createAppId(1, 1L);
|
||||||
|
SharedCacheResourceReference myRef = new SharedCacheResourceReference(id, user);
|
||||||
|
String result = store.addResourceReference(key, myRef);
|
||||||
|
assertEquals(fileName, result);
|
||||||
|
Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
|
||||||
|
assertSame(1, refs.size());
|
||||||
|
assertEquals(Collections.singleton(myRef), refs);
|
||||||
|
// remove the same ref
|
||||||
|
store.removeResourceReferences(key, Collections.singleton(myRef), true);
|
||||||
|
Collection<SharedCacheResourceReference> newRefs = store.getResourceReferences(key);
|
||||||
|
assertTrue(newRefs == null || newRefs.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBootstrapping() throws Exception {
|
||||||
|
Map<String, String> initialCachedResources = startStoreWithResources();
|
||||||
|
int count = initialCachedResources.size();
|
||||||
|
ApplicationId id = createAppId(1, 1L);
|
||||||
|
// the entries from the cached entries should now exist
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
String key = String.valueOf(i);
|
||||||
|
String fileName = key + ".jar";
|
||||||
|
String result =
|
||||||
|
store.addResourceReference(key, new SharedCacheResourceReference(id,
|
||||||
|
"user"));
|
||||||
|
// the value should not be null (i.e. it has the key) and the filename should match
|
||||||
|
assertEquals(fileName, result);
|
||||||
|
// the initial input should be emptied
|
||||||
|
assertTrue(initialCachedResources.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEvictableWithInitialApps() throws Exception {
|
||||||
|
startStoreWithApps();
|
||||||
|
assertFalse(store.isResourceEvictable("key", mock(FileStatus.class)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationId createAppId(int id, long timestamp) {
|
||||||
|
return ApplicationId.newInstance(timestamp, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
class DummyAppChecker extends AppChecker {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Private
|
||||||
|
public boolean isApplicationActive(ApplicationId id) throws YarnException {
|
||||||
|
// stub
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Private
|
||||||
|
public Collection<ApplicationId> getActiveApplications()
|
||||||
|
throws YarnException {
|
||||||
|
// stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue