diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b24f55305dd..47cf7183e60 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -9,6 +9,9 @@ Release 2.7.0 - UNRELEASED
YARN-2179. [YARN-1492] Initial cache manager structure and context.
(Chris Trezzo via kasha)
+ YARN-2180. [YARN-1492] In-memory backing store for cache manager.
+ (Chris Trezzo via kasha)
+
IMPROVEMENTS
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5835b7f1cdf..59cabe7a943 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1332,6 +1332,49 @@ public class YarnConfiguration extends Configuration {
SHARED_CACHE_PREFIX + "nested-level";
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
+ // Shared Cache Manager Configs
+
+ public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store.";
+
+ public static final String SCM_STORE_CLASS = SCM_STORE_PREFIX + "class";
+ public static final String DEFAULT_SCM_STORE_CLASS =
+ "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore";
+
+ public static final String SCM_APP_CHECKER_CLASS = SHARED_CACHE_PREFIX
+ + "app-checker.class";
+ public static final String DEFAULT_SCM_APP_CHECKER_CLASS =
+ "org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker";
+
+ // In-memory SCM store configuration
+
+ public static final String IN_MEMORY_STORE_PREFIX =
+ SHARED_CACHE_PREFIX + "in-memory.";
+
+ /**
+ * A resource in the InMemorySCMStore is considered stale if the time since
+ * the last reference exceeds the staleness period. This value is specified in
+ * minutes.
+ */
+ public static final String IN_MEMORY_STALENESS_PERIOD =
+ IN_MEMORY_STORE_PREFIX + "staleness-period";
+ public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
+
+ /**
+ * Initial delay before the in-memory store runs its first check to remove
+ * dead initial applications. Specified in minutes.
+ */
+ public static final String IN_MEMORY_INITIAL_DELAY =
+ IN_MEMORY_STORE_PREFIX + "initial-delay";
+ public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
+
+ /**
+ * The frequency at which the in-memory store checks to remove dead initial
+ * applications. Specified in minutes.
+ */
+ public static final String IN_MEMORY_CHECK_PERIOD =
+ IN_MEMORY_STORE_PREFIX + "check-period";
+ public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
+
////////////////////////////////
// Other Configs
////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a4f3106f902..1a66a6fc14a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1341,6 +1341,40 @@
3
+
+ The implementation to be used for the SCM store
+ yarn.sharedcache.store.class
+ org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore
+
+
+
+ The implementation to be used for the SCM app-checker
+ yarn.sharedcache.app-checker.class
+ org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker
+
+
+
+ A resource in the in-memory store is considered stale
+ if the time since the last reference exceeds the staleness period.
+ This value is specified in minutes.
+ yarn.sharedcache.store.in-memory.staleness-period
+ 10080
+
+
+
+ Initial delay before the in-memory store runs its first check
+ to remove dead initial applications. Specified in minutes.
+ yarn.sharedcache.store.in-memory.initial-delay
+ 10
+
+
+
+ The frequency at which the in-memory store checks to remove
+ dead initial applications. Specified in minutes.
+ yarn.sharedcache.store.in-memory.check-period
+ 720
+
+
The interval that the yarn client library uses to poll the
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
index 1bac75b74e2..4b933ac317e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java
@@ -32,9 +32,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
*/
@Private
@Unstable
-public class SharedCacheStructureUtil {
+public class SharedCacheUtil {
- private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class);
+ private static final Log LOG = LogFactory.getLog(SharedCacheUtil.class);
@Private
public static int getCacheDepth(Configuration conf) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
index e473c2b2cf0..80c345737cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml
@@ -42,6 +42,10 @@
org.apache.hadoop
hadoop-yarn-common
+
+ org.apache.hadoop
+ hadoop-yarn-server-common
+
org.apache.hadoop
hadoop-yarn-client
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
index 866c094e695..2f3ddb1db32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java
@@ -26,10 +26,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* This service maintains the shared cache meta data. It handles claiming and
@@ -47,12 +52,18 @@ public class SharedCacheManager extends CompositeService {
private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
+ private SCMStore store;
+
public SharedCacheManager() {
super("SharedCacheManager");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
+
+ this.store = createSCMStoreService(conf);
+ addService(store);
+
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
@@ -60,6 +71,25 @@ public class SharedCacheManager extends CompositeService {
super.serviceInit(conf);
}
+ @SuppressWarnings("unchecked")
+ private static SCMStore createSCMStoreService(Configuration conf) {
+ Class extends SCMStore> defaultStoreClass;
+ try {
+ defaultStoreClass =
+ (Class extends SCMStore>) Class
+ .forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS);
+ } catch (Exception e) {
+ throw new YarnRuntimeException("Invalid default scm store class"
+ + YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e);
+ }
+
+ SCMStore store =
+ ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.SCM_STORE_CLASS,
+ defaultStoreClass, SCMStore.class), conf);
+ return store;
+ }
+
@Override
protected void serviceStop() throws Exception {
@@ -67,6 +97,14 @@ public class SharedCacheManager extends CompositeService {
super.serviceStop();
}
+ /**
+ * For testing purposes only.
+ */
+ @VisibleForTesting
+ SCMStore getSCMStore() {
+ return this.store;
+ }
+
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
@@ -83,4 +121,24 @@ public class SharedCacheManager extends CompositeService {
System.exit(-1);
}
}
+
+ @Private
+ @SuppressWarnings("unchecked")
+ public static AppChecker createAppCheckerService(Configuration conf) {
+ Class extends AppChecker> defaultCheckerClass;
+ try {
+ defaultCheckerClass =
+ (Class extends AppChecker>) Class
+ .forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
+ } catch (Exception e) {
+ throw new YarnRuntimeException("Invalid default scm app checker class"
+ + YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
+ }
+
+ AppChecker checker =
+ ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
+ AppChecker.class), conf);
+ return checker;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
new file mode 100644
index 00000000000..79369d8e281
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
+import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A thread safe version of an in-memory SCM store. The thread safety is
+ * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap
+ * is used to allow concurrency to resources and their associated references,
+ * and (2) a key level lock is used to ensure mutual exclusion between any
+ * operation that accesses a resource with the same key.
+ *
+ * To ensure safe key-level locking, we use the original string key and intern
+ * it weakly using hadoop's StringInterner
. It avoids the pitfalls
+ * of using built-in String interning. The interned strings are also weakly
+ * referenced, so it can be garbage collected once it is done. And there is
+ * little risk of keys being available for other parts of the code so they can
+ * be used as locks accidentally.
+ *
+ * Resources in the in-memory store are evicted based on a time staleness
+ * criteria. If a resource is not referenced (i.e. used) for a given period, it
+ * is designated as a stale resource and is considered evictable.
+ */
+@Private
+@Evolving
+public class InMemorySCMStore extends SCMStore {
+ private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
+
+ private final Map cachedResources =
+ new ConcurrentHashMap();
+ private Collection initialApps =
+ new ArrayList();
+ private final Object initialAppsLock = new Object();
+ private long startTime;
+ private int stalenessMinutes;
+ private AppChecker appChecker;
+ private ScheduledExecutorService scheduler;
+ private int initialDelayMin;
+ private int checkPeriodMin;
+
+ public InMemorySCMStore() {
+ super(InMemorySCMStore.class.getName());
+ }
+
+ private String intern(String key) {
+ return StringInterner.weakIntern(key);
+ }
+
+ /**
+ * The in-memory store bootstraps itself from the shared cache entries that
+ * exist in HDFS.
+ */
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ this.startTime = System.currentTimeMillis();
+ this.initialDelayMin = getInitialDelay(conf);
+ this.checkPeriodMin = getCheckPeriod(conf);
+ this.stalenessMinutes = getStalenessPeriod(conf);
+
+ appChecker = createAppCheckerService(conf);
+ addService(appChecker);
+
+ bootstrap(conf);
+
+ ThreadFactory tf =
+ new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
+ .build();
+ scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ // start composed services first
+ super.serviceStart();
+
+ // Get initial list of running applications
+ LOG.info("Getting the active app list to initialize the in-memory scm store");
+ synchronized (initialAppsLock) {
+ initialApps = appChecker.getActiveApplications();
+ }
+ LOG.info(initialApps.size() + " apps recorded as active at this time");
+
+ Runnable task = new AppCheckTask(appChecker);
+ scheduler.scheduleAtFixedRate(task, initialDelayMin, checkPeriodMin,
+ TimeUnit.MINUTES);
+ LOG.info("Scheduled the in-memory scm store app check task to run every "
+ + checkPeriodMin + " minutes.");
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Shutting down the background thread.");
+ scheduler.shutdownNow();
+ try {
+ if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("Gave up waiting for the app check task to shutdown.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("The InMemorySCMStore was interrupted while shutting down the "
+ + "app check task.", e);
+ }
+ LOG.info("The background thread stopped.");
+
+ super.serviceStop();
+ }
+
+ @VisibleForTesting
+ AppChecker createAppCheckerService(Configuration conf) {
+ return SharedCacheManager.createAppCheckerService(conf);
+ }
+
+ private void bootstrap(Configuration conf) throws IOException {
+ Map initialCachedResources =
+ getInitialCachedResources(FileSystem.get(conf), conf);
+ LOG.info("Bootstrapping from " + initialCachedResources.size()
+ + " cache resources located in the file system");
+ Iterator> it =
+ initialCachedResources.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry e = it.next();
+ String key = intern(e.getKey());
+ String fileName = e.getValue();
+ SharedCacheResource resource = new SharedCacheResource(fileName);
+ // we don't hold the lock for this as it is done as part of serviceInit
+ cachedResources.put(key, resource);
+ // clear out the initial resource to reduce the footprint
+ it.remove();
+ }
+ LOG.info("Bootstrapping complete");
+ }
+
+ @VisibleForTesting
+ Map getInitialCachedResources(FileSystem fs,
+ Configuration conf) throws IOException {
+ // get the root directory for the shared cache
+ String location =
+ conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
+ YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
+ Path root = new Path(location);
+ if (!fs.exists(root)) {
+ String message =
+ "The shared cache root directory " + location + " was not found";
+ LOG.error(message);
+ throw new IOException(message);
+ }
+
+ int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
+ // now traverse individual directories and process them
+ // the directory structure is specified by the nested level parameter
+ // (e.g. 9/c/d//file)
+ StringBuilder pattern = new StringBuilder();
+ for (int i = 0; i < nestedLevel + 1; i++) {
+ pattern.append("*/");
+ }
+ pattern.append("*");
+
+ LOG.info("Querying for all individual cached resource files");
+ FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
+ int numEntries = entries == null ? 0 : entries.length;
+ LOG.info("Found " + numEntries + " files: processing for one resource per "
+ + "key");
+
+ Map initialCachedEntries = new HashMap();
+ if (entries != null) {
+ for (FileStatus entry : entries) {
+ Path file = entry.getPath();
+ String fileName = file.getName();
+ if (entry.isFile()) {
+ // get the parent to get the checksum
+ Path parent = file.getParent();
+ if (parent != null) {
+ // the name of the immediate parent directory is the checksum
+ String key = parent.getName();
+ // make sure we insert only one file per checksum whichever comes
+ // first
+ if (initialCachedEntries.containsKey(key)) {
+ LOG.warn("Key " + key + " is already mapped to file "
+ + initialCachedEntries.get(key) + "; file " + fileName
+ + " will not be added");
+ } else {
+ initialCachedEntries.put(key, fileName);
+ }
+ }
+ }
+ }
+ }
+ LOG.info("A total of " + initialCachedEntries.size()
+ + " files are now mapped");
+ return initialCachedEntries;
+ }
+
+ /**
+ * Adds the given resource to the store under the key and the filename. If the
+ * entry is already found, it returns the existing filename. It represents the
+ * state of the store at the time of this query. The entry may change or even
+ * be removed once this method returns. The caller should be prepared to
+ * handle that situation.
+ *
+ * @return the filename of the newly inserted resource or that of the existing
+ * resource
+ */
+ @Override
+ public String addResource(String key, String fileName) {
+ String interned = intern(key);
+ synchronized (interned) {
+ SharedCacheResource resource = cachedResources.get(interned);
+ if (resource == null) {
+ resource = new SharedCacheResource(fileName);
+ cachedResources.put(interned, resource);
+ }
+ return resource.getFileName();
+ }
+ }
+
+ /**
+ * Adds the provided resource reference to the cache resource under the key,
+ * and updates the access time. If it returns a non-null value, the caller may
+ * safely assume that the resource will not be removed at least until the app
+ * in this resource reference has terminated.
+ *
+ * @return the filename of the resource, or null if the resource is not found
+ */
+ @Override
+ public String addResourceReference(String key,
+ SharedCacheResourceReference ref) {
+ String interned = intern(key);
+ synchronized (interned) {
+ SharedCacheResource resource = cachedResources.get(interned);
+ if (resource == null) { // it's not mapped
+ return null;
+ }
+ resource.addReference(ref);
+ resource.updateAccessTime();
+ return resource.getFileName();
+ }
+ }
+
+ /**
+ * Returns the list of resource references currently registered under the
+ * cache entry. If the list is empty, it returns an empty collection. The
+ * returned collection is unmodifiable and a snapshot of the information at
+ * the time of the query. The state may change after this query returns. The
+ * caller should handle the situation that some or all of these resource
+ * references are no longer relevant.
+ *
+ * @return the collection that contains the resource references associated
+ * with the resource; or an empty collection if no resource references
+ * are registered under this resource
+ */
+ @Override
+ public Collection getResourceReferences(String key) {
+ String interned = intern(key);
+ synchronized (interned) {
+ SharedCacheResource resource = cachedResources.get(interned);
+ if (resource == null) {
+ return Collections.emptySet();
+ }
+ Set refs =
+ new HashSet(
+ resource.getResourceReferences());
+ return Collections.unmodifiableSet(refs);
+ }
+ }
+
+ /**
+ * Removes the provided resource reference from the resource. If the resource
+ * does not exist, nothing will be done.
+ */
+ @Override
+ public boolean removeResourceReference(String key, SharedCacheResourceReference ref,
+ boolean updateAccessTime) {
+ String interned = intern(key);
+ synchronized (interned) {
+ boolean removed = false;
+ SharedCacheResource resource = cachedResources.get(interned);
+ if (resource != null) {
+ Set resourceRefs =
+ resource.getResourceReferences();
+ removed = resourceRefs.remove(ref);
+ if (updateAccessTime) {
+ resource.updateAccessTime();
+ }
+ }
+ return removed;
+ }
+ }
+
+ /**
+ * Removes the provided collection of resource references from the resource.
+ * If the resource does not exist, nothing will be done.
+ */
+ @Override
+ public void removeResourceReferences(String key,
+ Collection refs, boolean updateAccessTime) {
+ String interned = intern(key);
+ synchronized (interned) {
+ SharedCacheResource resource = cachedResources.get(interned);
+ if (resource != null) {
+ Set resourceRefs =
+ resource.getResourceReferences();
+ resourceRefs.removeAll(refs);
+ if (updateAccessTime) {
+ resource.updateAccessTime();
+ }
+ }
+ }
+ }
+
+ /**
+ * Removes the given resource from the store. Returns true if the resource is
+ * found and removed or if the resource is not found. Returns false if it was
+ * unable to remove the resource because the resource reference list was not
+ * empty.
+ */
+ @Override
+ public boolean removeResource(String key) {
+ String interned = intern(key);
+ synchronized (interned) {
+ SharedCacheResource resource = cachedResources.get(interned);
+ if (resource == null) {
+ return true;
+ }
+
+ if (!resource.getResourceReferences().isEmpty()) {
+ return false;
+ }
+ // no users
+ cachedResources.remove(interned);
+ return true;
+ }
+ }
+
+ /**
+ * Obtains the access time for a resource. It represents the view of the
+ * resource at the time of the query. The value may have been updated at a
+ * later point.
+ *
+ * @return the access time of the resource if found; -1 if the resource is not
+ * found
+ */
+ @VisibleForTesting
+ long getAccessTime(String key) {
+ String interned = intern(key);
+ synchronized (interned) {
+ SharedCacheResource resource = cachedResources.get(interned);
+ return resource == null ? -1 : resource.getAccessTime();
+ }
+ }
+
+ @Override
+ public boolean isResourceEvictable(String key, FileStatus file) {
+ synchronized (initialAppsLock) {
+ if (initialApps.size() > 0) {
+ return false;
+ }
+ }
+
+ long staleTime =
+ System.currentTimeMillis()
+ - TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
+ long accessTime = getAccessTime(key);
+ if (accessTime == -1) {
+ // check modification time
+ long modTime = file.getModificationTime();
+ // if modification time is older then the store startup time, we need to
+ // just use the store startup time as the last point of certainty
+ long lastUse = modTime < this.startTime ? this.startTime : modTime;
+ return lastUse < staleTime;
+ } else {
+ // check access time
+ return accessTime < staleTime;
+ }
+ }
+
+ private static int getStalenessPeriod(Configuration conf) {
+ int stalenessMinutes =
+ conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
+ YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
+ // non-positive value is invalid; use the default
+ if (stalenessMinutes <= 0) {
+ throw new HadoopIllegalArgumentException("Non-positive staleness value: "
+ + stalenessMinutes
+ + ". The staleness value must be greater than zero.");
+ }
+ return stalenessMinutes;
+ }
+
+ private static int getInitialDelay(Configuration conf) {
+ int initialMinutes =
+ conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
+ YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
+ // non-positive value is invalid; use the default
+ if (initialMinutes <= 0) {
+ throw new HadoopIllegalArgumentException(
+ "Non-positive initial delay value: " + initialMinutes
+ + ". The initial delay value must be greater than zero.");
+ }
+ return initialMinutes;
+ }
+
+ private static int getCheckPeriod(Configuration conf) {
+ int checkMinutes =
+ conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
+ YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
+ // non-positive value is invalid; use the default
+ if (checkMinutes <= 0) {
+ throw new HadoopIllegalArgumentException(
+ "Non-positive check period value: " + checkMinutes
+ + ". The check period value must be greater than zero.");
+ }
+ return checkMinutes;
+ }
+
+ @Private
+ @Evolving
+ class AppCheckTask implements Runnable {
+
+ private final AppChecker taskAppChecker;
+
+ public AppCheckTask(AppChecker appChecker) {
+ this.taskAppChecker = appChecker;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("Checking the initial app list for finished applications.");
+ synchronized (initialAppsLock) {
+ if (initialApps.isEmpty()) {
+ // we're fine, no-op; there are no active apps that were running at
+ // the time of the service start
+ } else {
+ LOG.info("Looking into " + initialApps.size()
+ + " apps to see if they are still active");
+ Iterator it = initialApps.iterator();
+ while (it.hasNext()) {
+ ApplicationId id = it.next();
+ try {
+ if (!taskAppChecker.isApplicationActive(id)) {
+ // remove it from the list
+ it.remove();
+ }
+ } catch (YarnException e) {
+ LOG.warn("Exception while checking the app status;"
+ + " will leave the entry in the list", e);
+ // continue
+ }
+ }
+ }
+ LOG.info("There are now " + initialApps.size()
+ + " entries in the list");
+ }
+ } catch (Throwable e) {
+ LOG.error(
+ "Unexpected exception thrown during in-memory store app check task."
+ + " Rescheduling task.", e);
+ }
+
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
new file mode 100644
index 00000000000..397d9047943
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.service.CompositeService;
+
+
+/**
+ * An abstract class for the data store used by the shared cache manager
+ * service. All implementations of methods in this interface need to be thread
+ * safe and atomic.
+ */
+@Private
+@Evolving
+public abstract class SCMStore extends CompositeService {
+
+ protected SCMStore(String name) {
+ super(name);
+ }
+
+ /**
+ * Add a resource to the shared cache and it's associated filename. The
+ * resource is identified by a unique key. If the key already exists no action
+ * is taken and the filename of the existing resource is returned. If the key
+ * does not exist, the resource is added, it's access time is set, and the
+ * filename of the resource is returned.
+ *
+ * @param key a unique identifier for a resource
+ * @param fileName the filename of the resource
+ * @return the filename of the resource as represented by the cache
+ */
+ @Private
+ public abstract String addResource(String key, String fileName);
+
+ /**
+ * Remove a resource from the shared cache.
+ *
+ * @param key a unique identifier for a resource
+ * @return true if the resource was removed or did not exist, false if the
+ * resource existed, contained at least one
+ * SharedCacheResourceReference
and was not removed.
+ */
+ @Private
+ public abstract boolean removeResource(String key);
+
+ /**
+ * Add a SharedCacheResourceReference
to a resource and update
+ * the resource access time.
+ *
+ * @param key a unique identifier for a resource
+ * @param ref the SharedCacheResourceReference
to add
+ * @return String the filename of the resource if the
+ * SharedCacheResourceReference
was added or already
+ * existed. null if the resource did not exist
+ */
+ @Private
+ public abstract String addResourceReference(String key,
+ SharedCacheResourceReference ref);
+
+ /**
+ * Get the SharedCacheResourceReference
(s) associated with the
+ * resource.
+ *
+ * @param key a unique identifier for a resource
+ * @return an unmodifiable collection of
+ * SharedCacheResourceReferences
. If the resource does
+ * not exist, an empty set is returned.
+ */
+ @Private
+ public abstract Collection getResourceReferences(
+ String key);
+
+ /**
+ * Remove a SharedCacheResourceReference
from a resource.
+ *
+ * @param key a unique identifier for a resource
+ * @param ref the SharedCacheResourceReference
to remove
+ * @param updateAccessTime true if the call should update the access time for
+ * the resource
+ * @return true if the reference was removed, false otherwise
+ */
+ @Private
+ public abstract boolean removeResourceReference(String key,
+ SharedCacheResourceReference ref, boolean updateAccessTime);
+
+ /**
+ * Remove a collection of SharedCacheResourceReferences
from a
+ * resource.
+ *
+ * @param key a unique identifier for a resource
+ * @param refs the collection of SharedCacheResourceReference
s to
+ * remove
+ * @param updateAccessTime true if the call should update the access time for
+ * the resource
+ */
+ @Private
+ public abstract void removeResourceReferences(String key,
+ Collection refs, boolean updateAccessTime);
+
+ /**
+ * Check if a specific resource is evictable according to the store's enabled
+ * cache eviction policies.
+ *
+ * @param key a unique identifier for a resource
+ * @param file the FileStatus
object for the resource file in the
+ * file system.
+ * @return true if the resource is evicatble, false otherwise
+ */
+ @Private
+ public abstract boolean isResourceEvictable(String key, FileStatus file);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java
new file mode 100644
index 00000000000..cb0df541b1d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Class that encapsulates the cache resource. The instances are not thread
+ * safe. Any operation that uses the resource must use thread-safe mechanisms to
+ * ensure safe access with the only exception of the filename.
+ */
+@Private
+@Evolving
+class SharedCacheResource {
+ private long accessTime;
+ private final Set refs;
+ private final String fileName;
+
+ SharedCacheResource(String fileName) {
+ this.accessTime = System.currentTimeMillis();
+ this.refs = new HashSet();
+ this.fileName = fileName;
+ }
+
+ long getAccessTime() {
+ return accessTime;
+ }
+
+ void updateAccessTime() {
+ accessTime = System.currentTimeMillis();
+ }
+
+ String getFileName() {
+ return this.fileName;
+ }
+
+ Set getResourceReferences() {
+ return this.refs;
+ }
+
+ boolean addReference(SharedCacheResourceReference ref) {
+ return this.refs.add(ref);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java
new file mode 100644
index 00000000000..d595d9708b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * This is an object that represents a reference to a shared cache resource.
+ */
+@Private
+@Evolving
+public class SharedCacheResourceReference {
+ private final ApplicationId appId;
+ private final String shortUserName;
+
+ /**
+ * Create a resource reference.
+ *
+ * @param appId ApplicationId
that is referencing a resource.
+ * @param shortUserName ShortUserName
of the user that created
+ * the reference.
+ */
+ public SharedCacheResourceReference(ApplicationId appId, String shortUserName) {
+ this.appId = appId;
+ this.shortUserName = shortUserName;
+ }
+
+ public ApplicationId getAppId() {
+ return this.appId;
+ }
+
+ public String getShortUserName() {
+ return this.shortUserName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((appId == null) ? 0 : appId.hashCode());
+ result =
+ prime * result
+ + ((shortUserName == null) ? 0 : shortUserName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SharedCacheResourceReference other = (SharedCacheResourceReference) obj;
+ if (appId == null) {
+ if (other.appId != null)
+ return false;
+ } else if (!appId.equals(other.appId))
+ return false;
+ if (shortUserName == null) {
+ if (other.shortUserName != null)
+ return false;
+ } else if (!shortUserName.equals(other.shortUserName))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
new file mode 100644
index 00000000000..891703e992a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.sharedcachemanager.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestInMemorySCMStore {
+
+ private InMemorySCMStore store;
+ private AppChecker checker;
+
+ @Before
+ public void setup() {
+ this.store = spy(new InMemorySCMStore());
+ this.checker = spy(new DummyAppChecker());
+ doReturn(checker).when(store).createAppCheckerService(
+ isA(Configuration.class));
+ }
+
+ @After
+ public void cleanup() {
+ if (this.store != null) {
+ this.store.stop();
+ }
+ }
+
+ private void startEmptyStore() throws Exception {
+ doReturn(new ArrayList()).when(checker)
+ .getActiveApplications();
+ doReturn(new HashMap()).when(store)
+ .getInitialCachedResources(isA(FileSystem.class),
+ isA(Configuration.class));
+ this.store.init(new Configuration());
+ this.store.start();
+ }
+
+ private Map startStoreWithResources() throws Exception {
+ Map initialCachedResources = new HashMap();
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ String key = String.valueOf(i);
+ String fileName = key + ".jar";
+ initialCachedResources.put(key, fileName);
+ }
+ doReturn(new ArrayList()).when(checker)
+ .getActiveApplications();
+ doReturn(initialCachedResources).when(store).getInitialCachedResources(
+ isA(FileSystem.class), isA(Configuration.class));
+ this.store.init(new Configuration());
+ this.store.start();
+ return initialCachedResources;
+ }
+
+ private void startStoreWithApps() throws Exception {
+ ArrayList list = new ArrayList();
+ int count = 5;
+ for (int i = 0; i < count; i++) {
+ list.add(createAppId(i, i));
+ }
+ doReturn(list).when(checker).getActiveApplications();
+ doReturn(new HashMap()).when(store)
+ .getInitialCachedResources(isA(FileSystem.class),
+ isA(Configuration.class));
+ this.store.init(new Configuration());
+ this.store.start();
+ }
+
+ @Test
+ public void testAddResourceConcurrency() throws Exception {
+ startEmptyStore();
+ final String key = "key1";
+ int count = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(count);
+ List> futures = new ArrayList>(count);
+ final CountDownLatch start = new CountDownLatch(1);
+ for (int i = 0; i < count; i++) {
+ final String fileName = "foo-" + i + ".jar";
+ Callable task = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ String result = store.addResource(key, fileName);
+ System.out.println("fileName: " + fileName + ", result: " + result);
+ return result;
+ }
+ };
+ futures.add(exec.submit(task));
+ }
+ // start them all at the same time
+ start.countDown();
+ // check the result; they should all agree with the value
+ Set results = new HashSet();
+ for (Future future: futures) {
+ results.add(future.get());
+ }
+ assertSame(1, results.size());
+ exec.shutdown();
+ }
+
+ @Test
+ public void testAddResourceRefNonExistentResource() throws Exception {
+ startEmptyStore();
+ String key = "key1";
+ ApplicationId id = createAppId(1, 1L);
+ // try adding an app id without adding the key first
+ assertNull(store.addResourceReference(key,
+ new SharedCacheResourceReference(id, "user")));
+ }
+
+ @Test
+ public void testRemoveResourceEmptyRefs() throws Exception {
+ startEmptyStore();
+ String key = "key1";
+ String fileName = "foo.jar";
+ // first add resource
+ store.addResource(key, fileName);
+ // try removing the resource; it should return true
+ assertTrue(store.removeResource(key));
+ }
+
+ @Test
+ public void testAddResourceRefRemoveResource() throws Exception {
+ startEmptyStore();
+ String key = "key1";
+ ApplicationId id = createAppId(1, 1L);
+ String user = "user";
+ // add the resource, and then add a resource ref
+ store.addResource(key, "foo.jar");
+ store.addResourceReference(key, new SharedCacheResourceReference(id, user));
+ // removeResource should return false
+ assertTrue(!store.removeResource(key));
+ // the resource and the ref should be intact
+ Collection refs = store.getResourceReferences(key);
+ assertTrue(refs != null);
+ assertEquals(Collections.singleton(new SharedCacheResourceReference(id, user)), refs);
+ }
+
+ @Test
+ public void testAddResourceRefConcurrency() throws Exception {
+ startEmptyStore();
+ final String key = "key1";
+ final String user = "user";
+ String fileName = "foo.jar";
+
+ // first add the resource
+ store.addResource(key, fileName);
+
+ // make concurrent addResourceRef calls (clients)
+ int count = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(count);
+ List> futures = new ArrayList>(count);
+ final CountDownLatch start = new CountDownLatch(1);
+ for (int i = 0; i < count; i++) {
+ final ApplicationId id = createAppId(i, i);
+ Callable task = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ return store.addResourceReference(key,
+ new SharedCacheResourceReference(id, user));
+ }
+ };
+ futures.add(exec.submit(task));
+ }
+ // start them all at the same time
+ start.countDown();
+ // check the result
+ Set results = new HashSet();
+ for (Future future: futures) {
+ results.add(future.get());
+ }
+ // they should all have the same file name
+ assertSame(1, results.size());
+ assertEquals(Collections.singleton(fileName), results);
+ // there should be 5 refs as a result
+ Collection refs = store.getResourceReferences(key);
+ assertSame(count, refs.size());
+ exec.shutdown();
+ }
+
+ @Test
+ public void testAddResourceRefAddResourceConcurrency() throws Exception {
+ startEmptyStore();
+ final String key = "key1";
+ final String fileName = "foo.jar";
+ final String user = "user";
+ final ApplicationId id = createAppId(1, 1L);
+ // add the resource and add the resource ref at the same time
+ ExecutorService exec = Executors.newFixedThreadPool(2);
+ final CountDownLatch start = new CountDownLatch(1);
+ Callable addKeyTask = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ return store.addResource(key, fileName);
+ }
+ };
+ Callable addAppIdTask = new Callable() {
+ public String call() throws Exception {
+ start.await();
+ return store.addResourceReference(key,
+ new SharedCacheResourceReference(id, user));
+ }
+ };
+ Future addAppIdFuture = exec.submit(addAppIdTask);
+ Future addKeyFuture = exec.submit(addKeyTask);
+ // start them at the same time
+ start.countDown();
+ // get the results
+ String addKeyResult = addKeyFuture.get();
+ String addAppIdResult = addAppIdFuture.get();
+ assertEquals(fileName, addKeyResult);
+ System.out.println("addAppId() result: " + addAppIdResult);
+ // it may be null or the fileName depending on the timing
+ assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName));
+ exec.shutdown();
+ }
+
+ @Test
+ public void testRemoveRef() throws Exception {
+ startEmptyStore();
+ String key = "key1";
+ String fileName = "foo.jar";
+ String user = "user";
+ // first add the resource
+ store.addResource(key, fileName);
+ // add a ref
+ ApplicationId id = createAppId(1, 1L);
+ SharedCacheResourceReference myRef = new SharedCacheResourceReference(id, user);
+ String result = store.addResourceReference(key, myRef);
+ assertEquals(fileName, result);
+ Collection refs = store.getResourceReferences(key);
+ assertSame(1, refs.size());
+ assertEquals(Collections.singleton(myRef), refs);
+ // remove the same ref
+ store.removeResourceReferences(key, Collections.singleton(myRef), true);
+ Collection newRefs = store.getResourceReferences(key);
+ assertTrue(newRefs == null || newRefs.isEmpty());
+ }
+
+ @Test
+ public void testBootstrapping() throws Exception {
+ Map initialCachedResources = startStoreWithResources();
+ int count = initialCachedResources.size();
+ ApplicationId id = createAppId(1, 1L);
+ // the entries from the cached entries should now exist
+ for (int i = 0; i < count; i++) {
+ String key = String.valueOf(i);
+ String fileName = key + ".jar";
+ String result =
+ store.addResourceReference(key, new SharedCacheResourceReference(id,
+ "user"));
+ // the value should not be null (i.e. it has the key) and the filename should match
+ assertEquals(fileName, result);
+ // the initial input should be emptied
+ assertTrue(initialCachedResources.isEmpty());
+ }
+ }
+
+ @Test
+ public void testEvictableWithInitialApps() throws Exception {
+ startStoreWithApps();
+ assertFalse(store.isResourceEvictable("key", mock(FileStatus.class)));
+ }
+
+ private ApplicationId createAppId(int id, long timestamp) {
+ return ApplicationId.newInstance(timestamp, id);
+ }
+
+ class DummyAppChecker extends AppChecker {
+
+ @Override
+ @Private
+ public boolean isApplicationActive(ApplicationId id) throws YarnException {
+ // stub
+ return false;
+ }
+
+ @Override
+ @Private
+ public Collection getActiveApplications()
+ throws YarnException {
+ // stub
+ return null;
+ }
+
+ }
+}