YARN-2183. [YARN-1492] Cleaner service for cache manager. (Chris Trezzo and Sangjin Lee via kasha)
(cherry picked from commit c51e53d7aa
)
This commit is contained in:
parent
9bd149978d
commit
acc6f622e0
|
@ -12,6 +12,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2180. [YARN-1492] In-memory backing store for cache manager.
|
||||
(Chris Trezzo via kasha)
|
||||
|
||||
YARN-2183. [YARN-1492] Cleaner service for cache manager.
|
||||
(Chris Trezzo and Sangjin Lee via kasha)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1979. TestDirectoryCollection fails when the umask is unusual.
|
||||
|
|
|
@ -1393,25 +1393,54 @@ public class YarnConfiguration extends Configuration {
|
|||
* the last reference exceeds the staleness period. This value is specified in
|
||||
* minutes.
|
||||
*/
|
||||
public static final String IN_MEMORY_STALENESS_PERIOD =
|
||||
IN_MEMORY_STORE_PREFIX + "staleness-period";
|
||||
public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD = 7 * 24 * 60;
|
||||
public static final String IN_MEMORY_STALENESS_PERIOD_MINS =
|
||||
IN_MEMORY_STORE_PREFIX + "staleness-period-mins";
|
||||
public static final int DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS =
|
||||
7 * 24 * 60;
|
||||
|
||||
/**
|
||||
* Initial delay before the in-memory store runs its first check to remove
|
||||
* dead initial applications. Specified in minutes.
|
||||
*/
|
||||
public static final String IN_MEMORY_INITIAL_DELAY =
|
||||
IN_MEMORY_STORE_PREFIX + "initial-delay";
|
||||
public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY = 10;
|
||||
public static final String IN_MEMORY_INITIAL_DELAY_MINS =
|
||||
IN_MEMORY_STORE_PREFIX + "initial-delay-mins";
|
||||
public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10;
|
||||
|
||||
/**
|
||||
* The frequency at which the in-memory store checks to remove dead initial
|
||||
* applications. Specified in minutes.
|
||||
*/
|
||||
public static final String IN_MEMORY_CHECK_PERIOD =
|
||||
IN_MEMORY_STORE_PREFIX + "check-period";
|
||||
public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD = 12 * 60;
|
||||
public static final String IN_MEMORY_CHECK_PERIOD_MINS =
|
||||
IN_MEMORY_STORE_PREFIX + "check-period-mins";
|
||||
public static final int DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS = 12 * 60;
|
||||
|
||||
// SCM Cleaner service configuration
|
||||
|
||||
private static final String SCM_CLEANER_PREFIX = SHARED_CACHE_PREFIX
|
||||
+ "cleaner.";
|
||||
|
||||
/**
|
||||
* The frequency at which a cleaner task runs. Specified in minutes.
|
||||
*/
|
||||
public static final String SCM_CLEANER_PERIOD_MINS =
|
||||
SCM_CLEANER_PREFIX + "period-mins";
|
||||
public static final int DEFAULT_SCM_CLEANER_PERIOD_MINS = 24 * 60;
|
||||
|
||||
/**
|
||||
* Initial delay before the first cleaner task is scheduled. Specified in
|
||||
* minutes.
|
||||
*/
|
||||
public static final String SCM_CLEANER_INITIAL_DELAY_MINS =
|
||||
SCM_CLEANER_PREFIX + "initial-delay-mins";
|
||||
public static final int DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS = 10;
|
||||
|
||||
/**
|
||||
* The time to sleep between processing each shared cache resource. Specified
|
||||
* in milliseconds.
|
||||
*/
|
||||
public static final String SCM_CLEANER_RESOURCE_SLEEP_MS =
|
||||
SCM_CLEANER_PREFIX + "resource-sleep-ms";
|
||||
public static final long DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS = 0L;
|
||||
|
||||
////////////////////////////////
|
||||
// Other Configs
|
||||
|
|
|
@ -1379,24 +1379,45 @@
|
|||
<description>A resource in the in-memory store is considered stale
|
||||
if the time since the last reference exceeds the staleness period.
|
||||
This value is specified in minutes.</description>
|
||||
<name>yarn.sharedcache.store.in-memory.staleness-period</name>
|
||||
<name>yarn.sharedcache.store.in-memory.staleness-period-mins</name>
|
||||
<value>10080</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Initial delay before the in-memory store runs its first check
|
||||
to remove dead initial applications. Specified in minutes.</description>
|
||||
<name>yarn.sharedcache.store.in-memory.initial-delay</name>
|
||||
<name>yarn.sharedcache.store.in-memory.initial-delay-mins</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The frequency at which the in-memory store checks to remove
|
||||
dead initial applications. Specified in minutes.</description>
|
||||
<name>yarn.sharedcache.store.in-memory.check-period</name>
|
||||
<name>yarn.sharedcache.store.in-memory.check-period-mins</name>
|
||||
<value>720</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The frequency at which a cleaner task runs.
|
||||
Specified in minutes.</description>
|
||||
<name>yarn.sharedcache.cleaner.period-mins</name>
|
||||
<value>1440</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Initial delay before the first cleaner task is scheduled.
|
||||
Specified in minutes.</description>
|
||||
<name>yarn.sharedcache.cleaner.initial-delay-mins</name>
|
||||
<value>10</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The time to sleep between processing each shared cache
|
||||
resource. Specified in milliseconds.</description>
|
||||
<name>yarn.sharedcache.cleaner.resource-sleep-ms</name>
|
||||
<value>0</value>
|
||||
</property>
|
||||
|
||||
<!-- Other configuration -->
|
||||
<property>
|
||||
<description>The interval that the yarn client library uses to poll the
|
||||
|
|
|
@ -78,4 +78,14 @@ public class SharedCacheUtil {
|
|||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Private
|
||||
public static String getCacheEntryGlobPattern(int depth) {
|
||||
StringBuilder pattern = new StringBuilder();
|
||||
for (int i = 0; i < depth; i++) {
|
||||
pattern.append("*/");
|
||||
}
|
||||
pattern.append("*");
|
||||
return pattern.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* The cleaner service that maintains the shared cache area, and cleans up stale
|
||||
* entries on a regular basis.
|
||||
*/
|
||||
@Private
|
||||
@Evolving
|
||||
public class CleanerService extends CompositeService {
|
||||
/**
|
||||
* The name of the global cleaner lock that the cleaner creates to indicate
|
||||
* that a cleaning process is in progress.
|
||||
*/
|
||||
public static final String GLOBAL_CLEANER_PID = ".cleaner_pid";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CleanerService.class);
|
||||
|
||||
private Configuration conf;
|
||||
private CleanerMetrics metrics;
|
||||
private ScheduledExecutorService scheduledExecutor;
|
||||
private final SCMStore store;
|
||||
private final Lock cleanerTaskLock;
|
||||
|
||||
public CleanerService(SCMStore store) {
|
||||
super("CleanerService");
|
||||
this.store = store;
|
||||
this.cleanerTaskLock = new ReentrantLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.conf = conf;
|
||||
|
||||
// create scheduler executor service that services the cleaner tasks
|
||||
// use 2 threads to accommodate the on-demand tasks and reduce the chance of
|
||||
// back-to-back runs
|
||||
ThreadFactory tf =
|
||||
new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
|
||||
scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
if (!writeGlobalCleanerPidFile()) {
|
||||
throw new YarnException("The global cleaner pid file already exists! " +
|
||||
"It appears there is another CleanerService running in the cluster");
|
||||
}
|
||||
|
||||
this.metrics = CleanerMetrics.initSingleton(conf);
|
||||
|
||||
// Start dependent services (i.e. AppChecker)
|
||||
super.serviceStart();
|
||||
|
||||
Runnable task =
|
||||
CleanerTask.create(conf, store, metrics, cleanerTaskLock);
|
||||
long periodInMinutes = getPeriod(conf);
|
||||
scheduledExecutor.scheduleAtFixedRate(task, getInitialDelay(conf),
|
||||
periodInMinutes, TimeUnit.MINUTES);
|
||||
LOG.info("Scheduled the shared cache cleaner task to run every "
|
||||
+ periodInMinutes + " minutes.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Shutting down the background thread.");
|
||||
scheduledExecutor.shutdownNow();
|
||||
try {
|
||||
if (scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
LOG.info("The background thread stopped.");
|
||||
} else {
|
||||
LOG.warn("Gave up waiting for the cleaner task to shutdown.");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("The cleaner service was interrupted while shutting down the task.",
|
||||
e);
|
||||
}
|
||||
|
||||
removeGlobalCleanerPidFile();
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an on-demand cleaner task.
|
||||
*/
|
||||
protected void runCleanerTask() {
|
||||
Runnable task =
|
||||
CleanerTask.create(conf, store, metrics, cleanerTaskLock);
|
||||
// this is a non-blocking call (it simply submits the task to the executor
|
||||
// queue and returns)
|
||||
this.scheduledExecutor.execute(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* To ensure there are not multiple instances of the SCM running on a given
|
||||
* cluster, a global pid file is used. This file contains the hostname of the
|
||||
* machine that owns the pid file.
|
||||
*
|
||||
* @return true if the pid file was written, false otherwise
|
||||
* @throws YarnException
|
||||
*/
|
||||
private boolean writeGlobalCleanerPidFile() throws YarnException {
|
||||
String root =
|
||||
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
|
||||
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
|
||||
Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
|
||||
try {
|
||||
FileSystem fs = FileSystem.get(this.conf);
|
||||
|
||||
if (fs.exists(pidPath)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
FSDataOutputStream os = fs.create(pidPath, false);
|
||||
// write the hostname and the process id in the global cleaner pid file
|
||||
final String ID = ManagementFactory.getRuntimeMXBean().getName();
|
||||
os.writeUTF(ID);
|
||||
os.close();
|
||||
// add it to the delete-on-exit to ensure it gets deleted when the JVM
|
||||
// exits
|
||||
fs.deleteOnExit(pidPath);
|
||||
} catch (IOException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
LOG.info("Created the global cleaner pid file at " + pidPath.toString());
|
||||
return true;
|
||||
}
|
||||
|
||||
private void removeGlobalCleanerPidFile() {
|
||||
try {
|
||||
FileSystem fs = FileSystem.get(this.conf);
|
||||
String root =
|
||||
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
|
||||
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
|
||||
|
||||
Path pidPath = new Path(root, GLOBAL_CLEANER_PID);
|
||||
|
||||
|
||||
fs.delete(pidPath, false);
|
||||
LOG.info("Removed the global cleaner pid file at " + pidPath.toString());
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Unable to remove the global cleaner pid file! The file may need "
|
||||
+ "to be removed manually.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static int getInitialDelay(Configuration conf) {
|
||||
int initialDelayInMinutes =
|
||||
conf.getInt(YarnConfiguration.SCM_CLEANER_INITIAL_DELAY_MINS,
|
||||
YarnConfiguration.DEFAULT_SCM_CLEANER_INITIAL_DELAY_MINS);
|
||||
// negative value is invalid; use the default
|
||||
if (initialDelayInMinutes < 0) {
|
||||
throw new HadoopIllegalArgumentException("Negative initial delay value: "
|
||||
+ initialDelayInMinutes
|
||||
+ ". The initial delay must be greater than zero.");
|
||||
}
|
||||
return initialDelayInMinutes;
|
||||
}
|
||||
|
||||
private static int getPeriod(Configuration conf) {
|
||||
int periodInMinutes =
|
||||
conf.getInt(YarnConfiguration.SCM_CLEANER_PERIOD_MINS,
|
||||
YarnConfiguration.DEFAULT_SCM_CLEANER_PERIOD_MINS);
|
||||
// non-positive value is invalid; use the default
|
||||
if (periodInMinutes <= 0) {
|
||||
throw new HadoopIllegalArgumentException("Non-positive period value: "
|
||||
+ periodInMinutes
|
||||
+ ". The cleaner period must be greater than or equal to zero.");
|
||||
}
|
||||
return periodInMinutes;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,308 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
|
||||
|
||||
/**
|
||||
* The task that runs and cleans up the shared cache area for stale entries and
|
||||
* orphaned files. It is expected that only one cleaner task runs at any given
|
||||
* point in time.
|
||||
*/
|
||||
@Private
|
||||
@Evolving
|
||||
class CleanerTask implements Runnable {
|
||||
private static final String RENAMED_SUFFIX = "-renamed";
|
||||
private static final Log LOG = LogFactory.getLog(CleanerTask.class);
|
||||
|
||||
private final String location;
|
||||
private final long sleepTime;
|
||||
private final int nestedLevel;
|
||||
private final Path root;
|
||||
private final FileSystem fs;
|
||||
private final SCMStore store;
|
||||
private final CleanerMetrics metrics;
|
||||
private final Lock cleanerTaskLock;
|
||||
|
||||
/**
|
||||
* Creates a cleaner task based on the configuration. This is provided for
|
||||
* convenience.
|
||||
*
|
||||
* @param conf
|
||||
* @param store
|
||||
* @param metrics
|
||||
* @param cleanerTaskLock lock that ensures a serial execution of cleaner
|
||||
* task
|
||||
* @return an instance of a CleanerTask
|
||||
*/
|
||||
public static CleanerTask create(Configuration conf, SCMStore store,
|
||||
CleanerMetrics metrics, Lock cleanerTaskLock) {
|
||||
try {
|
||||
// get the root directory for the shared cache
|
||||
String location =
|
||||
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
|
||||
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
|
||||
|
||||
long sleepTime =
|
||||
conf.getLong(YarnConfiguration.SCM_CLEANER_RESOURCE_SLEEP_MS,
|
||||
YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS);
|
||||
int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
||||
return new CleanerTask(location, sleepTime, nestedLevel, fs, store,
|
||||
metrics, cleanerTaskLock);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to obtain the filesystem for the cleaner service", e);
|
||||
throw new ExceptionInInitializerError(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a cleaner task based on the root directory location and the
|
||||
* filesystem.
|
||||
*/
|
||||
CleanerTask(String location, long sleepTime, int nestedLevel, FileSystem fs,
|
||||
SCMStore store, CleanerMetrics metrics, Lock cleanerTaskLock) {
|
||||
this.location = location;
|
||||
this.sleepTime = sleepTime;
|
||||
this.nestedLevel = nestedLevel;
|
||||
this.root = new Path(location);
|
||||
this.fs = fs;
|
||||
this.store = store;
|
||||
this.metrics = metrics;
|
||||
this.cleanerTaskLock = cleanerTaskLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (!this.cleanerTaskLock.tryLock()) {
|
||||
// there is already another task running
|
||||
LOG.warn("A cleaner task is already running. "
|
||||
+ "This scheduled cleaner task will do nothing.");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!fs.exists(root)) {
|
||||
LOG.error("The shared cache root " + location + " was not found. "
|
||||
+ "The cleaner task will do nothing.");
|
||||
return;
|
||||
}
|
||||
|
||||
// we're now ready to process the shared cache area
|
||||
process();
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Unexpected exception while initializing the cleaner task. "
|
||||
+ "This task will do nothing,", e);
|
||||
} finally {
|
||||
// this is set to false regardless of if it is a scheduled or on-demand
|
||||
// task
|
||||
this.cleanerTaskLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweeps and processes the shared cache area to clean up stale and orphaned
|
||||
* files.
|
||||
*/
|
||||
void process() {
|
||||
// mark the beginning of the run in the metrics
|
||||
metrics.reportCleaningStart();
|
||||
try {
|
||||
// now traverse individual directories and process them
|
||||
// the directory structure is specified by the nested level parameter
|
||||
// (e.g. 9/c/d/<checksum>)
|
||||
String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel);
|
||||
FileStatus[] resources =
|
||||
fs.globStatus(new Path(root, pattern));
|
||||
int numResources = resources == null ? 0 : resources.length;
|
||||
LOG.info("Processing " + numResources + " resources in the shared cache");
|
||||
long beginMs = System.currentTimeMillis();
|
||||
if (resources != null) {
|
||||
for (FileStatus resource : resources) {
|
||||
// check for interruption so it can abort in a timely manner in case
|
||||
// of shutdown
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
LOG.warn("The cleaner task was interrupted. Aborting.");
|
||||
break;
|
||||
}
|
||||
|
||||
if (resource.isDirectory()) {
|
||||
processSingleResource(resource);
|
||||
} else {
|
||||
LOG.warn("Invalid file at path " + resource.getPath().toString()
|
||||
+
|
||||
" when a directory was expected");
|
||||
}
|
||||
// add sleep time between cleaning each directory if it is non-zero
|
||||
if (sleepTime > 0) {
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
long endMs = System.currentTimeMillis();
|
||||
long durationMs = endMs - beginMs;
|
||||
LOG.info("Processed " + numResources + " resource(s) in " + durationMs +
|
||||
" ms.");
|
||||
} catch (IOException e1) {
|
||||
LOG.error("Unable to complete the cleaner task", e1);
|
||||
} catch (InterruptedException e2) {
|
||||
Thread.currentThread().interrupt(); // restore the interrupt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a path for the root directory for the shared cache.
|
||||
*/
|
||||
Path getRootPath() {
|
||||
return root;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a single shared cache resource directory.
|
||||
*/
|
||||
void processSingleResource(FileStatus resource) {
|
||||
Path path = resource.getPath();
|
||||
// indicates the processing status of the resource
|
||||
ResourceStatus resourceStatus = ResourceStatus.INIT;
|
||||
|
||||
// first, if the path ends with the renamed suffix, it indicates the
|
||||
// directory was moved (as stale) but somehow not deleted (probably due to
|
||||
// SCM failure); delete the directory
|
||||
if (path.toString().endsWith(RENAMED_SUFFIX)) {
|
||||
LOG.info("Found a renamed directory that was left undeleted at " +
|
||||
path.toString() + ". Deleting.");
|
||||
try {
|
||||
if (fs.delete(path, true)) {
|
||||
resourceStatus = ResourceStatus.DELETED;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while processing a shared cache resource: " + path, e);
|
||||
}
|
||||
} else {
|
||||
// this is the path to the cache resource directory
|
||||
// the directory name is the resource key (i.e. a unique identifier)
|
||||
String key = path.getName();
|
||||
|
||||
try {
|
||||
store.cleanResourceReferences(key);
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Exception thrown while removing dead appIds.", e);
|
||||
}
|
||||
|
||||
if (store.isResourceEvictable(key, resource)) {
|
||||
try {
|
||||
/*
|
||||
* TODO See YARN-2663: There is a race condition between
|
||||
* store.removeResource(key) and
|
||||
* removeResourceFromCacheFileSystem(path) operations because they do
|
||||
* not happen atomically and resources can be uploaded with different
|
||||
* file names by the node managers.
|
||||
*/
|
||||
// remove the resource from scm (checks for appIds as well)
|
||||
if (store.removeResource(key)) {
|
||||
// remove the resource from the file system
|
||||
boolean deleted = removeResourceFromCacheFileSystem(path);
|
||||
if (deleted) {
|
||||
resourceStatus = ResourceStatus.DELETED;
|
||||
} else {
|
||||
LOG.error("Failed to remove path from the file system."
|
||||
+ " Skipping this resource: " + path);
|
||||
resourceStatus = ResourceStatus.ERROR;
|
||||
}
|
||||
} else {
|
||||
// we did not delete the resource because it contained application
|
||||
// ids
|
||||
resourceStatus = ResourceStatus.PROCESSED;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error(
|
||||
"Failed to remove path from the file system. Skipping this resource: "
|
||||
+ path, e);
|
||||
resourceStatus = ResourceStatus.ERROR;
|
||||
}
|
||||
} else {
|
||||
resourceStatus = ResourceStatus.PROCESSED;
|
||||
}
|
||||
}
|
||||
|
||||
// record the processing
|
||||
switch (resourceStatus) {
|
||||
case DELETED:
|
||||
metrics.reportAFileDelete();
|
||||
break;
|
||||
case PROCESSED:
|
||||
metrics.reportAFileProcess();
|
||||
break;
|
||||
case ERROR:
|
||||
metrics.reportAFileError();
|
||||
break;
|
||||
default:
|
||||
LOG.error("Cleaner encountered an invalid status (" + resourceStatus
|
||||
+ ") while processing resource: " + path.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean removeResourceFromCacheFileSystem(Path path)
|
||||
throws IOException {
|
||||
// rename the directory to make the delete atomic
|
||||
Path renamedPath = new Path(path.toString() + RENAMED_SUFFIX);
|
||||
if (fs.rename(path, renamedPath)) {
|
||||
// the directory can be removed safely now
|
||||
// log the original path
|
||||
LOG.info("Deleting " + path.toString());
|
||||
return fs.delete(renamedPath, true);
|
||||
} else {
|
||||
// we were unable to remove it for some reason: it's best to leave
|
||||
// it at that
|
||||
LOG.error("We were not able to rename the directory to "
|
||||
+ renamedPath.toString() + ". We will leave it intact.");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* A status indicating what happened with the processing of a given cache
|
||||
* resource.
|
||||
*/
|
||||
private enum ResourceStatus {
|
||||
INIT,
|
||||
/** Resource was successfully processed, but not deleted **/
|
||||
PROCESSED,
|
||||
/** Resource was successfully deleted **/
|
||||
DELETED,
|
||||
/** The cleaner task ran into an error while processing the resource **/
|
||||
ERROR
|
||||
}
|
||||
}
|
|
@ -64,6 +64,9 @@ public class SharedCacheManager extends CompositeService {
|
|||
this.store = createSCMStoreService(conf);
|
||||
addService(store);
|
||||
|
||||
CleanerService cs = createCleanerService(store);
|
||||
addService(cs);
|
||||
|
||||
// init metrics
|
||||
DefaultMetricsSystem.initialize("SharedCacheManager");
|
||||
JvmMetrics.initSingleton("SharedCacheManager", null);
|
||||
|
@ -90,6 +93,10 @@ public class SharedCacheManager extends CompositeService {
|
|||
return store;
|
||||
}
|
||||
|
||||
private CleanerService createCleanerService(SCMStore store) {
|
||||
return new CleanerService(store);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsSourceBuilder;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
|
||||
/**
|
||||
* This class is for maintaining the various Cleaner activity statistics and
|
||||
* publishing them through the metrics interfaces.
|
||||
*/
|
||||
@Private
|
||||
@Evolving
|
||||
@Metrics(name = "CleanerActivity", about = "Cleaner service metrics", context = "yarn")
|
||||
public class CleanerMetrics {
|
||||
public static final Log LOG = LogFactory.getLog(CleanerMetrics.class);
|
||||
private final MetricsRegistry registry = new MetricsRegistry("cleaner");
|
||||
|
||||
enum Singleton {
|
||||
INSTANCE;
|
||||
|
||||
CleanerMetrics impl;
|
||||
|
||||
synchronized CleanerMetrics init(Configuration conf) {
|
||||
if (impl == null) {
|
||||
impl = create(conf);
|
||||
}
|
||||
return impl;
|
||||
}
|
||||
}
|
||||
|
||||
public static CleanerMetrics initSingleton(Configuration conf) {
|
||||
return Singleton.INSTANCE.init(conf);
|
||||
}
|
||||
|
||||
public static CleanerMetrics getInstance() {
|
||||
CleanerMetrics topMetrics = Singleton.INSTANCE.impl;
|
||||
if (topMetrics == null)
|
||||
throw new IllegalStateException(
|
||||
"The CleanerMetics singlton instance is not initialized."
|
||||
+ " Have you called init first?");
|
||||
return topMetrics;
|
||||
}
|
||||
|
||||
@Metric("number of deleted files over all runs")
|
||||
private MutableCounterLong totalDeletedFiles;
|
||||
|
||||
public long getTotalDeletedFiles() {
|
||||
return totalDeletedFiles.value();
|
||||
}
|
||||
|
||||
private @Metric("number of deleted files in the last run")
|
||||
MutableGaugeLong deletedFiles;
|
||||
|
||||
public long getDeletedFiles() {
|
||||
return deletedFiles.value();
|
||||
}
|
||||
|
||||
private @Metric("number of processed files over all runs")
|
||||
MutableCounterLong totalProcessedFiles;
|
||||
|
||||
public long getTotalProcessedFiles() {
|
||||
return totalProcessedFiles.value();
|
||||
}
|
||||
|
||||
private @Metric("number of processed files in the last run")
|
||||
MutableGaugeLong processedFiles;
|
||||
|
||||
public long getProcessedFiles() {
|
||||
return processedFiles.value();
|
||||
}
|
||||
|
||||
@Metric("number of file errors over all runs")
|
||||
private MutableCounterLong totalFileErrors;
|
||||
|
||||
public long getTotalFileErrors() {
|
||||
return totalFileErrors.value();
|
||||
}
|
||||
|
||||
private @Metric("number of file errors in the last run")
|
||||
MutableGaugeLong fileErrors;
|
||||
|
||||
public long getFileErrors() {
|
||||
return fileErrors.value();
|
||||
}
|
||||
|
||||
private CleanerMetrics() {
|
||||
}
|
||||
|
||||
/**
|
||||
* The metric source obtained after parsing the annotations
|
||||
*/
|
||||
MetricsSource metricSource;
|
||||
|
||||
static CleanerMetrics create(Configuration conf) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
|
||||
CleanerMetrics metricObject = new CleanerMetrics();
|
||||
MetricsSourceBuilder sb = MetricsAnnotations.newSourceBuilder(metricObject);
|
||||
final MetricsSource s = sb.build();
|
||||
ms.register("cleaner", "The cleaner service of truly shared cache", s);
|
||||
metricObject.metricSource = s;
|
||||
return metricObject;
|
||||
}
|
||||
|
||||
/**
|
||||
* Report a delete operation at the current system time
|
||||
*/
|
||||
public void reportAFileDelete() {
|
||||
totalProcessedFiles.incr();
|
||||
processedFiles.incr();
|
||||
totalDeletedFiles.incr();
|
||||
deletedFiles.incr();
|
||||
}
|
||||
|
||||
/**
|
||||
* Report a process operation at the current system time
|
||||
*/
|
||||
public void reportAFileProcess() {
|
||||
totalProcessedFiles.incr();
|
||||
processedFiles.incr();
|
||||
}
|
||||
|
||||
/**
|
||||
* Report a process operation error at the current system time
|
||||
*/
|
||||
public void reportAFileError() {
|
||||
totalProcessedFiles.incr();
|
||||
processedFiles.incr();
|
||||
totalFileErrors.incr();
|
||||
fileErrors.incr();
|
||||
}
|
||||
|
||||
/**
|
||||
* Report the start a new run of the cleaner.
|
||||
*
|
||||
*/
|
||||
public void reportCleaningStart() {
|
||||
processedFiles.set(0);
|
||||
deletedFiles.set(0);
|
||||
fileErrors.set(0);
|
||||
}
|
||||
|
||||
}
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
@ -83,13 +82,12 @@ public class InMemorySCMStore extends SCMStore {
|
|||
private final Object initialAppsLock = new Object();
|
||||
private long startTime;
|
||||
private int stalenessMinutes;
|
||||
private AppChecker appChecker;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private int initialDelayMin;
|
||||
private int checkPeriodMin;
|
||||
|
||||
public InMemorySCMStore() {
|
||||
super(InMemorySCMStore.class.getName());
|
||||
public InMemorySCMStore(AppChecker appChecker) {
|
||||
super(InMemorySCMStore.class.getName(), appChecker);
|
||||
}
|
||||
|
||||
private String intern(String key) {
|
||||
|
@ -108,9 +106,6 @@ public class InMemorySCMStore extends SCMStore {
|
|||
this.checkPeriodMin = getCheckPeriod(conf);
|
||||
this.stalenessMinutes = getStalenessPeriod(conf);
|
||||
|
||||
appChecker = createAppCheckerService(conf);
|
||||
addService(appChecker);
|
||||
|
||||
bootstrap(conf);
|
||||
|
||||
ThreadFactory tf =
|
||||
|
@ -157,11 +152,6 @@ public class InMemorySCMStore extends SCMStore {
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AppChecker createAppCheckerService(Configuration conf) {
|
||||
return SharedCacheManager.createAppCheckerService(conf);
|
||||
}
|
||||
|
||||
private void bootstrap(Configuration conf) throws IOException {
|
||||
Map<String, String> initialCachedResources =
|
||||
getInitialCachedResources(FileSystem.get(conf), conf);
|
||||
|
@ -201,14 +191,10 @@ public class InMemorySCMStore extends SCMStore {
|
|||
// now traverse individual directories and process them
|
||||
// the directory structure is specified by the nested level parameter
|
||||
// (e.g. 9/c/d/<checksum>/file)
|
||||
StringBuilder pattern = new StringBuilder();
|
||||
for (int i = 0; i < nestedLevel + 1; i++) {
|
||||
pattern.append("*/");
|
||||
}
|
||||
pattern.append("*");
|
||||
String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1);
|
||||
|
||||
LOG.info("Querying for all individual cached resource files");
|
||||
FileStatus[] entries = fs.globStatus(new Path(root, pattern.toString()));
|
||||
FileStatus[] entries = fs.globStatus(new Path(root, pattern));
|
||||
int numEntries = entries == null ? 0 : entries.length;
|
||||
LOG.info("Found " + numEntries + " files: processing for one resource per "
|
||||
+ "key");
|
||||
|
@ -359,6 +345,17 @@ public class InMemorySCMStore extends SCMStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides atomicity for the method.
|
||||
*/
|
||||
@Override
|
||||
public void cleanResourceReferences(String key) throws YarnException {
|
||||
String interned = intern(key);
|
||||
synchronized (interned) {
|
||||
super.cleanResourceReferences(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the given resource from the store. Returns true if the resource is
|
||||
* found and removed or if the resource is not found. Returns false if it was
|
||||
|
@ -427,8 +424,8 @@ public class InMemorySCMStore extends SCMStore {
|
|||
|
||||
private static int getStalenessPeriod(Configuration conf) {
|
||||
int stalenessMinutes =
|
||||
conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD,
|
||||
YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD);
|
||||
conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS,
|
||||
YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS);
|
||||
// non-positive value is invalid; use the default
|
||||
if (stalenessMinutes <= 0) {
|
||||
throw new HadoopIllegalArgumentException("Non-positive staleness value: "
|
||||
|
@ -440,8 +437,8 @@ public class InMemorySCMStore extends SCMStore {
|
|||
|
||||
private static int getInitialDelay(Configuration conf) {
|
||||
int initialMinutes =
|
||||
conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY,
|
||||
YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY);
|
||||
conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS,
|
||||
YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS);
|
||||
// non-positive value is invalid; use the default
|
||||
if (initialMinutes <= 0) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
|
@ -453,8 +450,8 @@ public class InMemorySCMStore extends SCMStore {
|
|||
|
||||
private static int getCheckPeriod(Configuration conf) {
|
||||
int checkMinutes =
|
||||
conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD,
|
||||
YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD);
|
||||
conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS,
|
||||
YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS);
|
||||
// non-positive value is invalid; use the default
|
||||
if (checkMinutes <= 0) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
|
|
|
@ -19,11 +19,15 @@
|
|||
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -35,8 +39,11 @@ import org.apache.hadoop.service.CompositeService;
|
|||
@Evolving
|
||||
public abstract class SCMStore extends CompositeService {
|
||||
|
||||
protected SCMStore(String name) {
|
||||
protected final AppChecker appChecker;
|
||||
|
||||
protected SCMStore(String name, AppChecker appChecker) {
|
||||
super(name);
|
||||
this.appChecker = appChecker;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,6 +125,33 @@ public abstract class SCMStore extends CompositeService {
|
|||
public abstract void removeResourceReferences(String key,
|
||||
Collection<SharedCacheResourceReference> refs, boolean updateAccessTime);
|
||||
|
||||
/**
|
||||
* Clean all resource references to a cache resource that contain application
|
||||
* ids pointing to finished applications. If the resource key does not exist,
|
||||
* do nothing.
|
||||
*
|
||||
* @param key a unique identifier for a resource
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Private
|
||||
public void cleanResourceReferences(String key) throws YarnException {
|
||||
Collection<SharedCacheResourceReference> refs = getResourceReferences(key);
|
||||
if (!refs.isEmpty()) {
|
||||
Set<SharedCacheResourceReference> refsToRemove =
|
||||
new HashSet<SharedCacheResourceReference>();
|
||||
for (SharedCacheResourceReference r : refs) {
|
||||
if (!appChecker.isApplicationActive(r.getAppId())) {
|
||||
// application in resource reference is dead, it is safe to remove the
|
||||
// reference
|
||||
refsToRemove.add(r);
|
||||
}
|
||||
}
|
||||
if (refsToRemove.size() > 0) {
|
||||
removeResourceReferences(key, refsToRemove, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a specific resource is evictable according to the store's enabled
|
||||
* cache eviction policies.
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager;
|
||||
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
|
||||
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestCleanerTask {
|
||||
private static final String ROOT =
|
||||
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT;
|
||||
private static final long SLEEP_TIME =
|
||||
YarnConfiguration.DEFAULT_SCM_CLEANER_RESOURCE_SLEEP_MS;
|
||||
private static final int NESTED_LEVEL =
|
||||
YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
|
||||
|
||||
@Test
|
||||
public void testNonExistentRoot() throws Exception {
|
||||
FileSystem fs = mock(FileSystem.class);
|
||||
CleanerMetrics metrics = mock(CleanerMetrics.class);
|
||||
SCMStore store = mock(SCMStore.class);
|
||||
|
||||
CleanerTask task =
|
||||
createSpiedTask(fs, store, metrics, new ReentrantLock());
|
||||
// the shared cache root does not exist
|
||||
when(fs.exists(task.getRootPath())).thenReturn(false);
|
||||
|
||||
task.run();
|
||||
|
||||
// process() should not be called
|
||||
verify(task, never()).process();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessFreshResource() throws Exception {
|
||||
FileSystem fs = mock(FileSystem.class);
|
||||
CleanerMetrics metrics = mock(CleanerMetrics.class);
|
||||
SCMStore store = mock(SCMStore.class);
|
||||
|
||||
CleanerTask task =
|
||||
createSpiedTask(fs, store, metrics, new ReentrantLock());
|
||||
|
||||
// mock a resource that is not evictable
|
||||
when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
|
||||
.thenReturn(false);
|
||||
FileStatus status = mock(FileStatus.class);
|
||||
when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
|
||||
|
||||
// process the resource
|
||||
task.processSingleResource(status);
|
||||
|
||||
// the directory should not be renamed
|
||||
verify(fs, never()).rename(eq(status.getPath()), isA(Path.class));
|
||||
// metrics should record a processed file (but not delete)
|
||||
verify(metrics).reportAFileProcess();
|
||||
verify(metrics, never()).reportAFileDelete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProcessEvictableResource() throws Exception {
|
||||
FileSystem fs = mock(FileSystem.class);
|
||||
CleanerMetrics metrics = mock(CleanerMetrics.class);
|
||||
SCMStore store = mock(SCMStore.class);
|
||||
|
||||
CleanerTask task =
|
||||
createSpiedTask(fs, store, metrics, new ReentrantLock());
|
||||
|
||||
// mock an evictable resource
|
||||
when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
|
||||
.thenReturn(true);
|
||||
FileStatus status = mock(FileStatus.class);
|
||||
when(status.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
|
||||
when(store.removeResource(isA(String.class))).thenReturn(true);
|
||||
// rename succeeds
|
||||
when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
|
||||
// delete returns true
|
||||
when(fs.delete(isA(Path.class), anyBoolean())).thenReturn(true);
|
||||
|
||||
// process the resource
|
||||
task.processSingleResource(status);
|
||||
|
||||
// the directory should be renamed
|
||||
verify(fs).rename(eq(status.getPath()), isA(Path.class));
|
||||
// metrics should record a deleted file
|
||||
verify(metrics).reportAFileDelete();
|
||||
verify(metrics, never()).reportAFileProcess();
|
||||
}
|
||||
|
||||
private CleanerTask createSpiedTask(FileSystem fs, SCMStore store,
|
||||
CleanerMetrics metrics, Lock isCleanerRunning) {
|
||||
return spy(new CleanerTask(ROOT, SLEEP_TIME, NESTED_LEVEL, fs, store,
|
||||
metrics, isCleanerRunning));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceIsInUseHasAnActiveApp() throws Exception {
|
||||
FileSystem fs = mock(FileSystem.class);
|
||||
CleanerMetrics metrics = mock(CleanerMetrics.class);
|
||||
SCMStore store = mock(SCMStore.class);
|
||||
|
||||
FileStatus resource = mock(FileStatus.class);
|
||||
when(resource.getPath()).thenReturn(new Path(ROOT + "/a/b/c/abc"));
|
||||
// resource is stale
|
||||
when(store.isResourceEvictable(isA(String.class), isA(FileStatus.class)))
|
||||
.thenReturn(true);
|
||||
// but still has appIds
|
||||
when(store.removeResource(isA(String.class))).thenReturn(false);
|
||||
|
||||
CleanerTask task =
|
||||
createSpiedTask(fs, store, metrics, new ReentrantLock());
|
||||
|
||||
// process the resource
|
||||
task.processSingleResource(resource);
|
||||
|
||||
// metrics should record a processed file (but not delete)
|
||||
verify(metrics).reportAFileProcess();
|
||||
verify(metrics, never()).reportAFileDelete();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.sharedcachemanager.metrics;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestCleanerMetrics {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
CleanerMetrics cleanerMetrics;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
CleanerMetrics.initSingleton(conf);
|
||||
cleanerMetrics = CleanerMetrics.getInstance();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetricsOverMultiplePeriods() {
|
||||
simulateACleanerRun();
|
||||
assertMetrics(4, 4, 1, 1);
|
||||
simulateACleanerRun();
|
||||
assertMetrics(4, 8, 1, 2);
|
||||
}
|
||||
|
||||
public void simulateACleanerRun() {
|
||||
cleanerMetrics.reportCleaningStart();
|
||||
cleanerMetrics.reportAFileProcess();
|
||||
cleanerMetrics.reportAFileDelete();
|
||||
cleanerMetrics.reportAFileProcess();
|
||||
cleanerMetrics.reportAFileProcess();
|
||||
}
|
||||
|
||||
void assertMetrics(int proc, int totalProc, int del, int totalDel) {
|
||||
assertEquals(
|
||||
"Processed files in the last period are not measured correctly", proc,
|
||||
cleanerMetrics.getProcessedFiles());
|
||||
assertEquals("Total processed files are not measured correctly",
|
||||
totalProc, cleanerMetrics.getTotalProcessedFiles());
|
||||
assertEquals(
|
||||
"Deleted files in the last period are not measured correctly", del,
|
||||
cleanerMetrics.getDeletedFiles());
|
||||
assertEquals("Total deleted files are not measured correctly",
|
||||
totalDel, cleanerMetrics.getTotalDeletedFiles());
|
||||
}
|
||||
}
|
|
@ -60,10 +60,8 @@ public class TestInMemorySCMStore {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.store = spy(new InMemorySCMStore());
|
||||
this.checker = spy(new DummyAppChecker());
|
||||
doReturn(checker).when(store).createAppCheckerService(
|
||||
isA(Configuration.class));
|
||||
this.store = spy(new InMemorySCMStore(checker));
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
Loading…
Reference in New Issue