diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 85a88c8235a..1633cf7e721 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -36,6 +36,9 @@ Release 2.7.0 - UNRELEASED YARN-2179. [YARN-1492] Initial cache manager structure and context. (Chris Trezzo via kasha) + YARN-2180. [YARN-1492] In-memory backing store for cache manager. + (Chris Trezzo via kasha) + IMPROVEMENTS YARN-1979. TestDirectoryCollection fails when the umask is unusual. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1a2aa1dc556..f183a905e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1332,6 +1332,49 @@ public class YarnConfiguration extends Configuration { SHARED_CACHE_PREFIX + "nested-level"; public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; + // Shared Cache Manager Configs + + public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store."; + + public static final String SCM_STORE_CLASS = SCM_STORE_PREFIX + "class"; + public static final String DEFAULT_SCM_STORE_CLASS = + "org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore"; + + public static final String SCM_APP_CHECKER_CLASS = SHARED_CACHE_PREFIX + + "app-checker.class"; + public static final String DEFAULT_SCM_APP_CHECKER_CLASS = + "org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker"; + + // In-memory SCM store configuration + + public static final String IN_MEMORY_STORE_PREFIX = + SHARED_CACHE_PREFIX + "in-memory."; + + /** + * A resource in the InMemorySCMStore is considered stale if the time since + * the last reference exceeds the staleness period. This value is specified in + * minutes. + */ + public static final String IN_MEMORY_STALENESS_PERIOD = + IN_MEMORY_STORE_PREFIX + "staleness-period"; + public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60; + + /** + * Initial delay before the in-memory store runs its first check to remove + * dead initial applications. Specified in minutes. + */ + public static final String IN_MEMORY_INITIAL_DELAY = + IN_MEMORY_STORE_PREFIX + "initial-delay"; + public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10; + + /** + * The frequency at which the in-memory store checks to remove dead initial + * applications. Specified in minutes. + */ + public static final String IN_MEMORY_CHECK_PERIOD = + IN_MEMORY_STORE_PREFIX + "check-period"; + public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1db7939ba24..5d531917996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1342,6 +1342,40 @@ 3 + + The implementation to be used for the SCM store + yarn.sharedcache.store.class + org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore + + + + The implementation to be used for the SCM app-checker + yarn.sharedcache.app-checker.class + org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker + + + + A resource in the in-memory store is considered stale + if the time since the last reference exceeds the staleness period. + This value is specified in minutes. + yarn.sharedcache.store.in-memory.staleness-period + 10080 + + + + Initial delay before the in-memory store runs its first check + to remove dead initial applications. Specified in minutes. + yarn.sharedcache.store.in-memory.initial-delay + 10 + + + + The frequency at which the in-memory store checks to remove + dead initial applications. Specified in minutes. + yarn.sharedcache.store.in-memory.check-period + 720 + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java index 1bac75b74e2..4b933ac317e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheUtil.java @@ -32,9 +32,9 @@ */ @Private @Unstable -public class SharedCacheStructureUtil { +public class SharedCacheUtil { - private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class); + private static final Log LOG = LogFactory.getLog(SharedCacheUtil.class); @Private public static int getCacheDepth(Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml index 869298bfb4a..213d4365728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml @@ -42,6 +42,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-server-common + org.apache.hadoop hadoop-yarn-client diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index 866c094e695..2f3ddb1db32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -26,10 +26,15 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; + +import com.google.common.annotations.VisibleForTesting; /** * This service maintains the shared cache meta data. It handles claiming and @@ -47,12 +52,18 @@ public class SharedCacheManager extends CompositeService { private static final Log LOG = LogFactory.getLog(SharedCacheManager.class); + private SCMStore store; + public SharedCacheManager() { super("SharedCacheManager"); } @Override protected void serviceInit(Configuration conf) throws Exception { + + this.store = createSCMStoreService(conf); + addService(store); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -60,6 +71,25 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } + @SuppressWarnings("unchecked") + private static SCMStore createSCMStoreService(Configuration conf) { + Class defaultStoreClass; + try { + defaultStoreClass = + (Class) Class + .forName(YarnConfiguration.DEFAULT_SCM_STORE_CLASS); + } catch (Exception e) { + throw new YarnRuntimeException("Invalid default scm store class" + + YarnConfiguration.DEFAULT_SCM_STORE_CLASS, e); + } + + SCMStore store = + ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.SCM_STORE_CLASS, + defaultStoreClass, SCMStore.class), conf); + return store; + } + @Override protected void serviceStop() throws Exception { @@ -67,6 +97,14 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + /** + * For testing purposes only. + */ + @VisibleForTesting + SCMStore getSCMStore() { + return this.store; + } + public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG); @@ -83,4 +121,24 @@ public static void main(String[] args) { System.exit(-1); } } + + @Private + @SuppressWarnings("unchecked") + public static AppChecker createAppCheckerService(Configuration conf) { + Class defaultCheckerClass; + try { + defaultCheckerClass = + (Class) Class + .forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS); + } catch (Exception e) { + throw new YarnRuntimeException("Invalid default scm app checker class" + + YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e); + } + + AppChecker checker = + ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass, + AppChecker.class), conf); + return checker; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java new file mode 100644 index 00000000000..79369d8e281 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; +import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A thread safe version of an in-memory SCM store. The thread safety is + * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap + * is used to allow concurrency to resources and their associated references, + * and (2) a key level lock is used to ensure mutual exclusion between any + * operation that accesses a resource with the same key.
+ *
+ * To ensure safe key-level locking, we use the original string key and intern + * it weakly using hadoop's StringInterner. It avoids the pitfalls + * of using built-in String interning. The interned strings are also weakly + * referenced, so it can be garbage collected once it is done. And there is + * little risk of keys being available for other parts of the code so they can + * be used as locks accidentally.
+ *
+ * Resources in the in-memory store are evicted based on a time staleness + * criteria. If a resource is not referenced (i.e. used) for a given period, it + * is designated as a stale resource and is considered evictable. + */ +@Private +@Evolving +public class InMemorySCMStore extends SCMStore { + private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class); + + private final Map cachedResources = + new ConcurrentHashMap(); + private Collection initialApps = + new ArrayList(); + private final Object initialAppsLock = new Object(); + private long startTime; + private int stalenessMinutes; + private AppChecker appChecker; + private ScheduledExecutorService scheduler; + private int initialDelayMin; + private int checkPeriodMin; + + public InMemorySCMStore() { + super(InMemorySCMStore.class.getName()); + } + + private String intern(String key) { + return StringInterner.weakIntern(key); + } + + /** + * The in-memory store bootstraps itself from the shared cache entries that + * exist in HDFS. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + + this.startTime = System.currentTimeMillis(); + this.initialDelayMin = getInitialDelay(conf); + this.checkPeriodMin = getCheckPeriod(conf); + this.stalenessMinutes = getStalenessPeriod(conf); + + appChecker = createAppCheckerService(conf); + addService(appChecker); + + bootstrap(conf); + + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore") + .build(); + scheduler = Executors.newSingleThreadScheduledExecutor(tf); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + // start composed services first + super.serviceStart(); + + // Get initial list of running applications + LOG.info("Getting the active app list to initialize the in-memory scm store"); + synchronized (initialAppsLock) { + initialApps = appChecker.getActiveApplications(); + } + LOG.info(initialApps.size() + " apps recorded as active at this time"); + + Runnable task = new AppCheckTask(appChecker); + scheduler.scheduleAtFixedRate(task, initialDelayMin, checkPeriodMin, + TimeUnit.MINUTES); + LOG.info("Scheduled the in-memory scm store app check task to run every " + + checkPeriodMin + " minutes."); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Shutting down the background thread."); + scheduler.shutdownNow(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.warn("Gave up waiting for the app check task to shutdown."); + } + } catch (InterruptedException e) { + LOG.warn("The InMemorySCMStore was interrupted while shutting down the " + + "app check task.", e); + } + LOG.info("The background thread stopped."); + + super.serviceStop(); + } + + @VisibleForTesting + AppChecker createAppCheckerService(Configuration conf) { + return SharedCacheManager.createAppCheckerService(conf); + } + + private void bootstrap(Configuration conf) throws IOException { + Map initialCachedResources = + getInitialCachedResources(FileSystem.get(conf), conf); + LOG.info("Bootstrapping from " + initialCachedResources.size() + + " cache resources located in the file system"); + Iterator> it = + initialCachedResources.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry e = it.next(); + String key = intern(e.getKey()); + String fileName = e.getValue(); + SharedCacheResource resource = new SharedCacheResource(fileName); + // we don't hold the lock for this as it is done as part of serviceInit + cachedResources.put(key, resource); + // clear out the initial resource to reduce the footprint + it.remove(); + } + LOG.info("Bootstrapping complete"); + } + + @VisibleForTesting + Map getInitialCachedResources(FileSystem fs, + Configuration conf) throws IOException { + // get the root directory for the shared cache + String location = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + Path root = new Path(location); + if (!fs.exists(root)) { + String message = + "The shared cache root directory " + location + " was not found"; + LOG.error(message); + throw new IOException(message); + } + + int nestedLevel = SharedCacheUtil.getCacheDepth(conf); + // now traverse individual directories and process them + // the directory structure is specified by the nested level parameter + // (e.g. 9/c/d//file) + StringBuilder pattern = new StringBuilder(); + for (int i = 0; i < nestedLevel + 1; i++) { + pattern.append("*/"); + } + pattern.append("*"); + + LOG.info("Querying for all individual cached resource files"); + FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString())); + int numEntries = entries == null ? 0 : entries.length; + LOG.info("Found " + numEntries + " files: processing for one resource per " + + "key"); + + Map initialCachedEntries = new HashMap(); + if (entries != null) { + for (FileStatus entry : entries) { + Path file = entry.getPath(); + String fileName = file.getName(); + if (entry.isFile()) { + // get the parent to get the checksum + Path parent = file.getParent(); + if (parent != null) { + // the name of the immediate parent directory is the checksum + String key = parent.getName(); + // make sure we insert only one file per checksum whichever comes + // first + if (initialCachedEntries.containsKey(key)) { + LOG.warn("Key " + key + " is already mapped to file " + + initialCachedEntries.get(key) + "; file " + fileName + + " will not be added"); + } else { + initialCachedEntries.put(key, fileName); + } + } + } + } + } + LOG.info("A total of " + initialCachedEntries.size() + + " files are now mapped"); + return initialCachedEntries; + } + + /** + * Adds the given resource to the store under the key and the filename. If the + * entry is already found, it returns the existing filename. It represents the + * state of the store at the time of this query. The entry may change or even + * be removed once this method returns. The caller should be prepared to + * handle that situation. + * + * @return the filename of the newly inserted resource or that of the existing + * resource + */ + @Override + public String addResource(String key, String fileName) { + String interned = intern(key); + synchronized (interned) { + SharedCacheResource resource = cachedResources.get(interned); + if (resource == null) { + resource = new SharedCacheResource(fileName); + cachedResources.put(interned, resource); + } + return resource.getFileName(); + } + } + + /** + * Adds the provided resource reference to the cache resource under the key, + * and updates the access time. If it returns a non-null value, the caller may + * safely assume that the resource will not be removed at least until the app + * in this resource reference has terminated. + * + * @return the filename of the resource, or null if the resource is not found + */ + @Override + public String addResourceReference(String key, + SharedCacheResourceReference ref) { + String interned = intern(key); + synchronized (interned) { + SharedCacheResource resource = cachedResources.get(interned); + if (resource == null) { // it's not mapped + return null; + } + resource.addReference(ref); + resource.updateAccessTime(); + return resource.getFileName(); + } + } + + /** + * Returns the list of resource references currently registered under the + * cache entry. If the list is empty, it returns an empty collection. The + * returned collection is unmodifiable and a snapshot of the information at + * the time of the query. The state may change after this query returns. The + * caller should handle the situation that some or all of these resource + * references are no longer relevant. + * + * @return the collection that contains the resource references associated + * with the resource; or an empty collection if no resource references + * are registered under this resource + */ + @Override + public Collection getResourceReferences(String key) { + String interned = intern(key); + synchronized (interned) { + SharedCacheResource resource = cachedResources.get(interned); + if (resource == null) { + return Collections.emptySet(); + } + Set refs = + new HashSet( + resource.getResourceReferences()); + return Collections.unmodifiableSet(refs); + } + } + + /** + * Removes the provided resource reference from the resource. If the resource + * does not exist, nothing will be done. + */ + @Override + public boolean removeResourceReference(String key, SharedCacheResourceReference ref, + boolean updateAccessTime) { + String interned = intern(key); + synchronized (interned) { + boolean removed = false; + SharedCacheResource resource = cachedResources.get(interned); + if (resource != null) { + Set resourceRefs = + resource.getResourceReferences(); + removed = resourceRefs.remove(ref); + if (updateAccessTime) { + resource.updateAccessTime(); + } + } + return removed; + } + } + + /** + * Removes the provided collection of resource references from the resource. + * If the resource does not exist, nothing will be done. + */ + @Override + public void removeResourceReferences(String key, + Collection refs, boolean updateAccessTime) { + String interned = intern(key); + synchronized (interned) { + SharedCacheResource resource = cachedResources.get(interned); + if (resource != null) { + Set resourceRefs = + resource.getResourceReferences(); + resourceRefs.removeAll(refs); + if (updateAccessTime) { + resource.updateAccessTime(); + } + } + } + } + + /** + * Removes the given resource from the store. Returns true if the resource is + * found and removed or if the resource is not found. Returns false if it was + * unable to remove the resource because the resource reference list was not + * empty. + */ + @Override + public boolean removeResource(String key) { + String interned = intern(key); + synchronized (interned) { + SharedCacheResource resource = cachedResources.get(interned); + if (resource == null) { + return true; + } + + if (!resource.getResourceReferences().isEmpty()) { + return false; + } + // no users + cachedResources.remove(interned); + return true; + } + } + + /** + * Obtains the access time for a resource. It represents the view of the + * resource at the time of the query. The value may have been updated at a + * later point. + * + * @return the access time of the resource if found; -1 if the resource is not + * found + */ + @VisibleForTesting + long getAccessTime(String key) { + String interned = intern(key); + synchronized (interned) { + SharedCacheResource resource = cachedResources.get(interned); + return resource == null ? -1 : resource.getAccessTime(); + } + } + + @Override + public boolean isResourceEvictable(String key, FileStatus file) { + synchronized (initialAppsLock) { + if (initialApps.size() > 0) { + return false; + } + } + + long staleTime = + System.currentTimeMillis() + - TimeUnit.MINUTES.toMillis(this.stalenessMinutes); + long accessTime = getAccessTime(key); + if (accessTime == -1) { + // check modification time + long modTime = file.getModificationTime(); + // if modification time is older then the store startup time, we need to + // just use the store startup time as the last point of certainty + long lastUse = modTime < this.startTime ? this.startTime : modTime; + return lastUse < staleTime; + } else { + // check access time + return accessTime < staleTime; + } + } + + private static int getStalenessPeriod(Configuration conf) { + int stalenessMinutes = + conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD, + YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD); + // non-positive value is invalid; use the default + if (stalenessMinutes <= 0) { + throw new HadoopIllegalArgumentException("Non-positive staleness value: " + + stalenessMinutes + + ". The staleness value must be greater than zero."); + } + return stalenessMinutes; + } + + private static int getInitialDelay(Configuration conf) { + int initialMinutes = + conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY, + YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY); + // non-positive value is invalid; use the default + if (initialMinutes <= 0) { + throw new HadoopIllegalArgumentException( + "Non-positive initial delay value: " + initialMinutes + + ". The initial delay value must be greater than zero."); + } + return initialMinutes; + } + + private static int getCheckPeriod(Configuration conf) { + int checkMinutes = + conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD, + YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD); + // non-positive value is invalid; use the default + if (checkMinutes <= 0) { + throw new HadoopIllegalArgumentException( + "Non-positive check period value: " + checkMinutes + + ". The check period value must be greater than zero."); + } + return checkMinutes; + } + + @Private + @Evolving + class AppCheckTask implements Runnable { + + private final AppChecker taskAppChecker; + + public AppCheckTask(AppChecker appChecker) { + this.taskAppChecker = appChecker; + } + + @Override + public void run() { + try { + LOG.info("Checking the initial app list for finished applications."); + synchronized (initialAppsLock) { + if (initialApps.isEmpty()) { + // we're fine, no-op; there are no active apps that were running at + // the time of the service start + } else { + LOG.info("Looking into " + initialApps.size() + + " apps to see if they are still active"); + Iterator it = initialApps.iterator(); + while (it.hasNext()) { + ApplicationId id = it.next(); + try { + if (!taskAppChecker.isApplicationActive(id)) { + // remove it from the list + it.remove(); + } + } catch (YarnException e) { + LOG.warn("Exception while checking the app status;" + + " will leave the entry in the list", e); + // continue + } + } + } + LOG.info("There are now " + initialApps.size() + + " entries in the list"); + } + } catch (Throwable e) { + LOG.error( + "Unexpected exception thrown during in-memory store app check task." + + " Rescheduling task.", e); + } + + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java new file mode 100644 index 00000000000..397d9047943 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SCMStore.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.service.CompositeService; + + +/** + * An abstract class for the data store used by the shared cache manager + * service. All implementations of methods in this interface need to be thread + * safe and atomic. + */ +@Private +@Evolving +public abstract class SCMStore extends CompositeService { + + protected SCMStore(String name) { + super(name); + } + + /** + * Add a resource to the shared cache and it's associated filename. The + * resource is identified by a unique key. If the key already exists no action + * is taken and the filename of the existing resource is returned. If the key + * does not exist, the resource is added, it's access time is set, and the + * filename of the resource is returned. + * + * @param key a unique identifier for a resource + * @param fileName the filename of the resource + * @return the filename of the resource as represented by the cache + */ + @Private + public abstract String addResource(String key, String fileName); + + /** + * Remove a resource from the shared cache. + * + * @param key a unique identifier for a resource + * @return true if the resource was removed or did not exist, false if the + * resource existed, contained at least one + * SharedCacheResourceReference and was not removed. + */ + @Private + public abstract boolean removeResource(String key); + + /** + * Add a SharedCacheResourceReference to a resource and update + * the resource access time. + * + * @param key a unique identifier for a resource + * @param ref the SharedCacheResourceReference to add + * @return String the filename of the resource if the + * SharedCacheResourceReference was added or already + * existed. null if the resource did not exist + */ + @Private + public abstract String addResourceReference(String key, + SharedCacheResourceReference ref); + + /** + * Get the SharedCacheResourceReference(s) associated with the + * resource. + * + * @param key a unique identifier for a resource + * @return an unmodifiable collection of + * SharedCacheResourceReferences. If the resource does + * not exist, an empty set is returned. + */ + @Private + public abstract Collection getResourceReferences( + String key); + + /** + * Remove a SharedCacheResourceReference from a resource. + * + * @param key a unique identifier for a resource + * @param ref the SharedCacheResourceReference to remove + * @param updateAccessTime true if the call should update the access time for + * the resource + * @return true if the reference was removed, false otherwise + */ + @Private + public abstract boolean removeResourceReference(String key, + SharedCacheResourceReference ref, boolean updateAccessTime); + + /** + * Remove a collection of SharedCacheResourceReferences from a + * resource. + * + * @param key a unique identifier for a resource + * @param refs the collection of SharedCacheResourceReferences to + * remove + * @param updateAccessTime true if the call should update the access time for + * the resource + */ + @Private + public abstract void removeResourceReferences(String key, + Collection refs, boolean updateAccessTime); + + /** + * Check if a specific resource is evictable according to the store's enabled + * cache eviction policies. + * + * @param key a unique identifier for a resource + * @param file the FileStatus object for the resource file in the + * file system. + * @return true if the resource is evicatble, false otherwise + */ + @Private + public abstract boolean isResourceEvictable(String key, FileStatus file); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java new file mode 100644 index 00000000000..cb0df541b1d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResource.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Class that encapsulates the cache resource. The instances are not thread + * safe. Any operation that uses the resource must use thread-safe mechanisms to + * ensure safe access with the only exception of the filename. + */ +@Private +@Evolving +class SharedCacheResource { + private long accessTime; + private final Set refs; + private final String fileName; + + SharedCacheResource(String fileName) { + this.accessTime = System.currentTimeMillis(); + this.refs = new HashSet(); + this.fileName = fileName; + } + + long getAccessTime() { + return accessTime; + } + + void updateAccessTime() { + accessTime = System.currentTimeMillis(); + } + + String getFileName() { + return this.fileName; + } + + Set getResourceReferences() { + return this.refs; + } + + boolean addReference(SharedCacheResourceReference ref) { + return this.refs.add(ref); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java new file mode 100644 index 00000000000..d595d9708b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/SharedCacheResourceReference.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * This is an object that represents a reference to a shared cache resource. + */ +@Private +@Evolving +public class SharedCacheResourceReference { + private final ApplicationId appId; + private final String shortUserName; + + /** + * Create a resource reference. + * + * @param appId ApplicationId that is referencing a resource. + * @param shortUserName ShortUserName of the user that created + * the reference. + */ + public SharedCacheResourceReference(ApplicationId appId, String shortUserName) { + this.appId = appId; + this.shortUserName = shortUserName; + } + + public ApplicationId getAppId() { + return this.appId; + } + + public String getShortUserName() { + return this.shortUserName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((appId == null) ? 0 : appId.hashCode()); + result = + prime * result + + ((shortUserName == null) ? 0 : shortUserName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SharedCacheResourceReference other = (SharedCacheResourceReference) obj; + if (appId == null) { + if (other.appId != null) + return false; + } else if (!appId.equals(other.appId)) + return false; + if (shortUserName == null) { + if (other.shortUserName != null) + return false; + } else if (!shortUserName.equals(other.shortUserName)) + return false; + return true; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java new file mode 100644 index 00000000000..891703e992a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestInMemorySCMStore { + + private InMemorySCMStore store; + private AppChecker checker; + + @Before + public void setup() { + this.store = spy(new InMemorySCMStore()); + this.checker = spy(new DummyAppChecker()); + doReturn(checker).when(store).createAppCheckerService( + isA(Configuration.class)); + } + + @After + public void cleanup() { + if (this.store != null) { + this.store.stop(); + } + } + + private void startEmptyStore() throws Exception { + doReturn(new ArrayList()).when(checker) + .getActiveApplications(); + doReturn(new HashMap()).when(store) + .getInitialCachedResources(isA(FileSystem.class), + isA(Configuration.class)); + this.store.init(new Configuration()); + this.store.start(); + } + + private Map startStoreWithResources() throws Exception { + Map initialCachedResources = new HashMap(); + int count = 10; + for (int i = 0; i < count; i++) { + String key = String.valueOf(i); + String fileName = key + ".jar"; + initialCachedResources.put(key, fileName); + } + doReturn(new ArrayList()).when(checker) + .getActiveApplications(); + doReturn(initialCachedResources).when(store).getInitialCachedResources( + isA(FileSystem.class), isA(Configuration.class)); + this.store.init(new Configuration()); + this.store.start(); + return initialCachedResources; + } + + private void startStoreWithApps() throws Exception { + ArrayList list = new ArrayList(); + int count = 5; + for (int i = 0; i < count; i++) { + list.add(createAppId(i, i)); + } + doReturn(list).when(checker).getActiveApplications(); + doReturn(new HashMap()).when(store) + .getInitialCachedResources(isA(FileSystem.class), + isA(Configuration.class)); + this.store.init(new Configuration()); + this.store.start(); + } + + @Test + public void testAddResourceConcurrency() throws Exception { + startEmptyStore(); + final String key = "key1"; + int count = 5; + ExecutorService exec = Executors.newFixedThreadPool(count); + List> futures = new ArrayList>(count); + final CountDownLatch start = new CountDownLatch(1); + for (int i = 0; i < count; i++) { + final String fileName = "foo-" + i + ".jar"; + Callable task = new Callable() { + public String call() throws Exception { + start.await(); + String result = store.addResource(key, fileName); + System.out.println("fileName: " + fileName + ", result: " + result); + return result; + } + }; + futures.add(exec.submit(task)); + } + // start them all at the same time + start.countDown(); + // check the result; they should all agree with the value + Set results = new HashSet(); + for (Future future: futures) { + results.add(future.get()); + } + assertSame(1, results.size()); + exec.shutdown(); + } + + @Test + public void testAddResourceRefNonExistentResource() throws Exception { + startEmptyStore(); + String key = "key1"; + ApplicationId id = createAppId(1, 1L); + // try adding an app id without adding the key first + assertNull(store.addResourceReference(key, + new SharedCacheResourceReference(id, "user"))); + } + + @Test + public void testRemoveResourceEmptyRefs() throws Exception { + startEmptyStore(); + String key = "key1"; + String fileName = "foo.jar"; + // first add resource + store.addResource(key, fileName); + // try removing the resource; it should return true + assertTrue(store.removeResource(key)); + } + + @Test + public void testAddResourceRefRemoveResource() throws Exception { + startEmptyStore(); + String key = "key1"; + ApplicationId id = createAppId(1, 1L); + String user = "user"; + // add the resource, and then add a resource ref + store.addResource(key, "foo.jar"); + store.addResourceReference(key, new SharedCacheResourceReference(id, user)); + // removeResource should return false + assertTrue(!store.removeResource(key)); + // the resource and the ref should be intact + Collection refs = store.getResourceReferences(key); + assertTrue(refs != null); + assertEquals(Collections.singleton(new SharedCacheResourceReference(id, user)), refs); + } + + @Test + public void testAddResourceRefConcurrency() throws Exception { + startEmptyStore(); + final String key = "key1"; + final String user = "user"; + String fileName = "foo.jar"; + + // first add the resource + store.addResource(key, fileName); + + // make concurrent addResourceRef calls (clients) + int count = 5; + ExecutorService exec = Executors.newFixedThreadPool(count); + List> futures = new ArrayList>(count); + final CountDownLatch start = new CountDownLatch(1); + for (int i = 0; i < count; i++) { + final ApplicationId id = createAppId(i, i); + Callable task = new Callable() { + public String call() throws Exception { + start.await(); + return store.addResourceReference(key, + new SharedCacheResourceReference(id, user)); + } + }; + futures.add(exec.submit(task)); + } + // start them all at the same time + start.countDown(); + // check the result + Set results = new HashSet(); + for (Future future: futures) { + results.add(future.get()); + } + // they should all have the same file name + assertSame(1, results.size()); + assertEquals(Collections.singleton(fileName), results); + // there should be 5 refs as a result + Collection refs = store.getResourceReferences(key); + assertSame(count, refs.size()); + exec.shutdown(); + } + + @Test + public void testAddResourceRefAddResourceConcurrency() throws Exception { + startEmptyStore(); + final String key = "key1"; + final String fileName = "foo.jar"; + final String user = "user"; + final ApplicationId id = createAppId(1, 1L); + // add the resource and add the resource ref at the same time + ExecutorService exec = Executors.newFixedThreadPool(2); + final CountDownLatch start = new CountDownLatch(1); + Callable addKeyTask = new Callable() { + public String call() throws Exception { + start.await(); + return store.addResource(key, fileName); + } + }; + Callable addAppIdTask = new Callable() { + public String call() throws Exception { + start.await(); + return store.addResourceReference(key, + new SharedCacheResourceReference(id, user)); + } + }; + Future addAppIdFuture = exec.submit(addAppIdTask); + Future addKeyFuture = exec.submit(addKeyTask); + // start them at the same time + start.countDown(); + // get the results + String addKeyResult = addKeyFuture.get(); + String addAppIdResult = addAppIdFuture.get(); + assertEquals(fileName, addKeyResult); + System.out.println("addAppId() result: " + addAppIdResult); + // it may be null or the fileName depending on the timing + assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName)); + exec.shutdown(); + } + + @Test + public void testRemoveRef() throws Exception { + startEmptyStore(); + String key = "key1"; + String fileName = "foo.jar"; + String user = "user"; + // first add the resource + store.addResource(key, fileName); + // add a ref + ApplicationId id = createAppId(1, 1L); + SharedCacheResourceReference myRef = new SharedCacheResourceReference(id, user); + String result = store.addResourceReference(key, myRef); + assertEquals(fileName, result); + Collection refs = store.getResourceReferences(key); + assertSame(1, refs.size()); + assertEquals(Collections.singleton(myRef), refs); + // remove the same ref + store.removeResourceReferences(key, Collections.singleton(myRef), true); + Collection newRefs = store.getResourceReferences(key); + assertTrue(newRefs == null || newRefs.isEmpty()); + } + + @Test + public void testBootstrapping() throws Exception { + Map initialCachedResources = startStoreWithResources(); + int count = initialCachedResources.size(); + ApplicationId id = createAppId(1, 1L); + // the entries from the cached entries should now exist + for (int i = 0; i < count; i++) { + String key = String.valueOf(i); + String fileName = key + ".jar"; + String result = + store.addResourceReference(key, new SharedCacheResourceReference(id, + "user")); + // the value should not be null (i.e. it has the key) and the filename should match + assertEquals(fileName, result); + // the initial input should be emptied + assertTrue(initialCachedResources.isEmpty()); + } + } + + @Test + public void testEvictableWithInitialApps() throws Exception { + startStoreWithApps(); + assertFalse(store.isResourceEvictable("key", mock(FileStatus.class))); + } + + private ApplicationId createAppId(int id, long timestamp) { + return ApplicationId.newInstance(timestamp, id); + } + + class DummyAppChecker extends AppChecker { + + @Override + @Private + public boolean isApplicationActive(ApplicationId id) throws YarnException { + // stub + return false; + } + + @Override + @Private + public Collection getActiveApplications() + throws YarnException { + // stub + return null; + } + + } +}