HBASE-20352 [Chore] Backport HBASE-18309 (Support multi threads in CleanerChore) to branch-1
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
de0dd9e821
commit
427c95e2e5
|
@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
||||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
|
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
|
||||||
|
@ -893,6 +894,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
(System.currentTimeMillis() - masterActiveTime) / 1000.0f));
|
(System.currentTimeMillis() - masterActiveTime) / 1000.0f));
|
||||||
this.masterFinishedInitializationTime = System.currentTimeMillis();
|
this.masterFinishedInitializationTime = System.currentTimeMillis();
|
||||||
configurationManager.registerObserver(this.balancer);
|
configurationManager.registerObserver(this.balancer);
|
||||||
|
configurationManager.registerObserver(this.logCleaner);
|
||||||
|
|
||||||
// Set master as 'initialized'.
|
// Set master as 'initialized'.
|
||||||
setInitialized(true);
|
setInitialized(true);
|
||||||
|
@ -1215,6 +1217,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
|
this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
|
||||||
startProcedureExecutor();
|
startProcedureExecutor();
|
||||||
|
|
||||||
|
// Initial cleaner chore
|
||||||
|
CleanerChore.initChorePool(conf);
|
||||||
// Start log cleaner thread
|
// Start log cleaner thread
|
||||||
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
|
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
|
||||||
this.logCleaner =
|
this.logCleaner =
|
||||||
|
|
|
@ -20,6 +20,10 @@ package org.apache.hadoop.hbase.master.cleaner;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
import java.util.concurrent.ForkJoinTask;
|
||||||
|
import java.util.concurrent.RecursiveTask;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -28,12 +32,16 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|
||||||
import org.apache.hadoop.hbase.ScheduledChore;
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FileStatusFilter;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -42,16 +50,107 @@ import com.google.common.collect.Lists;
|
||||||
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
|
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
|
||||||
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
|
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
|
||||||
*/
|
*/
|
||||||
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
|
||||||
|
justification="Static pool will be only updated once.")
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
|
||||||
|
implements ConfigurationObserver {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
|
private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
|
||||||
|
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
|
||||||
|
|
||||||
private final FileSystem fs;
|
/**
|
||||||
|
* If it is an integer and >= 1, it would be the size;
|
||||||
|
* if 0.0 < size <= 1.0, size would be available processors * size.
|
||||||
|
* Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
|
||||||
|
* while latter will use only 1 thread for chore to scan dir.
|
||||||
|
*/
|
||||||
|
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
|
||||||
|
private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
|
||||||
|
|
||||||
|
private static class DirScanPool {
|
||||||
|
int size;
|
||||||
|
ForkJoinPool pool;
|
||||||
|
int cleanerLatch;
|
||||||
|
AtomicBoolean reconfigNotification;
|
||||||
|
|
||||||
|
DirScanPool(Configuration conf) {
|
||||||
|
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
|
||||||
|
size = calculatePoolSize(poolSize);
|
||||||
|
// poolSize may be 0 or 0.0 from a careless configuration,
|
||||||
|
// double check to make sure.
|
||||||
|
size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
|
||||||
|
pool = new ForkJoinPool(size);
|
||||||
|
LOG.info("Cleaner pool size is " + size);
|
||||||
|
reconfigNotification = new AtomicBoolean(false);
|
||||||
|
cleanerLatch = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if pool can be updated. If so, mark for update later.
|
||||||
|
* @param conf configuration
|
||||||
|
*/
|
||||||
|
synchronized void markUpdate(Configuration conf) {
|
||||||
|
int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
|
||||||
|
if (newSize == size) {
|
||||||
|
LOG.trace("Size from configuration is same as previous=" + newSize +
|
||||||
|
" no need to update.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
size = newSize;
|
||||||
|
// Chore is working, update it later.
|
||||||
|
reconfigNotification.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update pool with new size.
|
||||||
|
*/
|
||||||
|
synchronized void updatePool(long timeout) {
|
||||||
|
long stopWaitTime = System.currentTimeMillis() + timeout;
|
||||||
|
while (cleanerLatch != 0 && timeout > 0) {
|
||||||
|
try {
|
||||||
|
wait(timeout);
|
||||||
|
timeout = stopWaitTime - System.currentTimeMillis();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool.shutdownNow();
|
||||||
|
LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size);
|
||||||
|
pool = new ForkJoinPool(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void latchCountUp() {
|
||||||
|
cleanerLatch++;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void latchCountDown() {
|
||||||
|
cleanerLatch--;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("FutureReturnValueIgnored")
|
||||||
|
synchronized void submit(ForkJoinTask task) {
|
||||||
|
pool.submit(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// It may be waste resources for each cleaner chore own its pool,
|
||||||
|
// so let's make pool for all cleaner chores.
|
||||||
|
private static volatile DirScanPool POOL;
|
||||||
|
|
||||||
|
protected final FileSystem fs;
|
||||||
private final Path oldFileDir;
|
private final Path oldFileDir;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
protected List<T> cleanersChain;
|
protected List<T> cleanersChain;
|
||||||
private AtomicBoolean enabled = new AtomicBoolean(true);
|
private AtomicBoolean enabled = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
public static void initChorePool(Configuration conf) {
|
||||||
|
if (POOL == null) {
|
||||||
|
POOL = new DirScanPool(conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param name name of the chore being run
|
* @param name name of the chore being run
|
||||||
* @param sleepPeriod the period of time to sleep between each run
|
* @param sleepPeriod the period of time to sleep between each run
|
||||||
|
@ -64,6 +163,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
||||||
FileSystem fs, Path oldFileDir, String confKey) {
|
FileSystem fs, Path oldFileDir, String confKey) {
|
||||||
super(name, s, sleepPeriod);
|
super(name, s, sleepPeriod);
|
||||||
|
|
||||||
|
Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
|
||||||
|
+ "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.oldFileDir = oldFileDir;
|
this.oldFileDir = oldFileDir;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -71,6 +173,36 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
initCleanerChain(confKey);
|
initCleanerChain(confKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate size for cleaner pool.
|
||||||
|
* @param poolSize size from configuration
|
||||||
|
* @return size of pool after calculation
|
||||||
|
*/
|
||||||
|
static int calculatePoolSize(String poolSize) {
|
||||||
|
if (poolSize.matches("[1-9][0-9]*")) {
|
||||||
|
// If poolSize is an integer, return it directly,
|
||||||
|
// but upmost to the number of available processors.
|
||||||
|
int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
|
||||||
|
if (size == AVAIL_PROCESSORS) {
|
||||||
|
LOG.warn("Use full core processors to scan dir, size=" + size);
|
||||||
|
}
|
||||||
|
return size;
|
||||||
|
} else if (poolSize.matches("0.[0-9]+|1.0")) {
|
||||||
|
// if poolSize is a double, return poolSize * availableProcessors;
|
||||||
|
// Ensure that we always return at least one.
|
||||||
|
int computedThreads = (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
|
||||||
|
if (computedThreads < 1) {
|
||||||
|
LOG.debug("Computed " + computedThreads + " threads for CleanerChore, using 1 instead");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return computedThreads;
|
||||||
|
} else {
|
||||||
|
LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
|
||||||
|
", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
|
||||||
|
return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate the file to see if it even belongs in the directory. If it is valid, then the file
|
* Validate the file to see if it even belongs in the directory. If it is valid, then the file
|
||||||
* will go through the cleaner delegates, but otherwise the file is just deleted.
|
* will go through the cleaner delegates, but otherwise the file is just deleted.
|
||||||
|
@ -97,6 +229,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
POOL.markUpdate(conf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
|
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
|
||||||
* LogCleanerDelegate.
|
* LogCleanerDelegate.
|
||||||
|
@ -122,96 +259,32 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
@Override
|
@Override
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
if (getEnabled()) {
|
if (getEnabled()) {
|
||||||
runCleaner();
|
try {
|
||||||
|
POOL.latchCountUp();
|
||||||
|
if (runCleaner()) {
|
||||||
|
LOG.debug("Cleaned all WALs under " + oldFileDir);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Cleaner disabled! Not cleaning.");
|
LOG.warn("WALs outstanding under " + oldFileDir);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
POOL.latchCountDown();
|
||||||
|
}
|
||||||
|
// After each cleaner chore, checks if received reconfigure notification while cleaning.
|
||||||
|
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
|
||||||
|
if (POOL.reconfigNotification.compareAndSet(true, false)) {
|
||||||
|
// This cleaner is waiting for other cleaners finishing their jobs.
|
||||||
|
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
|
||||||
|
POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.debug("Cleaner chore disabled! Not cleaning.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Boolean runCleaner() {
|
public Boolean runCleaner() {
|
||||||
try {
|
CleanerTask task = new CleanerTask(this.oldFileDir, true);
|
||||||
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
|
POOL.submit(task);
|
||||||
checkAndDeleteEntries(files);
|
return task.join();
|
||||||
} catch (IOException e) {
|
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
|
||||||
LOG.warn("Error while cleaning the logs", e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Loop over the given directory entries, and check whether they can be deleted.
|
|
||||||
* If an entry is itself a directory it will be recursively checked and deleted itself iff
|
|
||||||
* all subentries are deleted (and no new subentries are added in the mean time)
|
|
||||||
*
|
|
||||||
* @param entries directory entries to check
|
|
||||||
* @return true if all entries were successfully deleted
|
|
||||||
*/
|
|
||||||
private boolean checkAndDeleteEntries(FileStatus[] entries) {
|
|
||||||
if (entries == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
boolean allEntriesDeleted = true;
|
|
||||||
List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
|
|
||||||
for (FileStatus child : entries) {
|
|
||||||
Path path = child.getPath();
|
|
||||||
if (child.isDirectory()) {
|
|
||||||
// for each subdirectory delete it and all entries if possible
|
|
||||||
if (!checkAndDeleteDirectory(path)) {
|
|
||||||
allEntriesDeleted = false;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// collect all files to attempt to delete in one batch
|
|
||||||
files.add(child);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!checkAndDeleteFiles(files)) {
|
|
||||||
allEntriesDeleted = false;
|
|
||||||
}
|
|
||||||
return allEntriesDeleted;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Attempt to delete a directory and all files under that directory. Each child file is passed
|
|
||||||
* through the delegates to see if it can be deleted. If the directory has no children when the
|
|
||||||
* cleaners have finished it is deleted.
|
|
||||||
* <p>
|
|
||||||
* If new children files are added between checks of the directory, the directory will <b>not</b>
|
|
||||||
* be deleted.
|
|
||||||
* @param dir directory to check
|
|
||||||
* @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Checking directory: " + dir);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
FileStatus[] children = FSUtils.listStatus(fs, dir);
|
|
||||||
boolean allChildrenDeleted = checkAndDeleteEntries(children);
|
|
||||||
|
|
||||||
// if the directory still has children, we can't delete it, so we are done
|
|
||||||
if (!allChildrenDeleted) return false;
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
|
||||||
LOG.warn("Error while listing directory: " + dir, e);
|
|
||||||
// couldn't list directory, so don't try to delete, and don't return success
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise, all the children (that we know about) have been deleted, so we should try to
|
|
||||||
// delete this directory. However, don't do so recursively so we don't delete files that have
|
|
||||||
// been added since we last checked.
|
|
||||||
try {
|
|
||||||
return fs.delete(dir, false);
|
|
||||||
} catch (IOException e) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Couldn't delete directory: " + dir, e);
|
|
||||||
}
|
|
||||||
// couldn't delete w/o exception, so we can't return success.
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -221,6 +294,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
* @return true iff successfully deleted all files
|
* @return true iff successfully deleted all files
|
||||||
*/
|
*/
|
||||||
private boolean checkAndDeleteFiles(List<FileStatus> files) {
|
private boolean checkAndDeleteFiles(List<FileStatus> files) {
|
||||||
|
if (files == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// first check to see if the path is valid
|
// first check to see if the path is valid
|
||||||
List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
|
List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
|
||||||
List<FileStatus> invalidFiles = Lists.newArrayList();
|
List<FileStatus> invalidFiles = Lists.newArrayList();
|
||||||
|
@ -258,12 +335,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
|
Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
|
||||||
|
return deleteFiles(filesToDelete) == files.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the given files
|
||||||
|
* @param filesToDelete files to delete
|
||||||
|
* @return number of deleted files
|
||||||
|
*/
|
||||||
|
protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
|
||||||
int deletedFileCount = 0;
|
int deletedFileCount = 0;
|
||||||
for (FileStatus file : filesToDelete) {
|
for (FileStatus file : filesToDelete) {
|
||||||
Path filePath = file.getPath();
|
Path filePath = file.getPath();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.trace("Removing " + file + " from archive");
|
||||||
LOG.debug("Removing: " + filePath + " from archive");
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
boolean success = this.fs.delete(filePath, false);
|
boolean success = this.fs.delete(filePath, false);
|
||||||
if (success) {
|
if (success) {
|
||||||
|
@ -273,12 +357,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
|
+ ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
e = e instanceof RemoteException ?
|
||||||
|
((RemoteException)e).unwrapRemoteException() : e;
|
||||||
LOG.warn("Error while deleting: " + filePath, e);
|
LOG.warn("Error while deleting: " + filePath, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return deletedFileCount;
|
||||||
return deletedFileCount == files.size();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -292,6 +376,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getChorePoolSize() {
|
||||||
|
return POOL.size;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param enabled
|
* @param enabled
|
||||||
*/
|
*/
|
||||||
|
@ -302,4 +391,126 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
public boolean getEnabled() {
|
public boolean getEnabled() {
|
||||||
return this.enabled.get();
|
return this.enabled.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private interface Action<T> {
|
||||||
|
T act() throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class CleanerTask extends RecursiveTask<Boolean> {
|
||||||
|
private final Path dir;
|
||||||
|
private final boolean root;
|
||||||
|
|
||||||
|
CleanerTask(final FileStatus dir, final boolean root) {
|
||||||
|
this(dir.getPath(), root);
|
||||||
|
}
|
||||||
|
|
||||||
|
CleanerTask(final Path dir, final boolean root) {
|
||||||
|
this.dir = dir;
|
||||||
|
this.root = root;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Boolean compute() {
|
||||||
|
LOG.debug("Cleaning under " + dir);
|
||||||
|
List<FileStatus> subDirs;
|
||||||
|
final List<FileStatus> files;
|
||||||
|
try {
|
||||||
|
subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(FileStatus f) {
|
||||||
|
return f.isDirectory();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
files = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(FileStatus f) {
|
||||||
|
return f.isFile();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn(dir + " doesn't exist, just skip it. ", ioe);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean nullSubDirs = subDirs == null;
|
||||||
|
if (nullSubDirs) {
|
||||||
|
LOG.trace("There is no subdir under " + dir);
|
||||||
|
}
|
||||||
|
if (files == null) {
|
||||||
|
LOG.trace("There is no file under " + dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
int capacity = nullSubDirs ? 0 : subDirs.size();
|
||||||
|
final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity);
|
||||||
|
if (!nullSubDirs) {
|
||||||
|
for (FileStatus subdir : subDirs) {
|
||||||
|
CleanerTask task = new CleanerTask(subdir, false);
|
||||||
|
tasks.add(task);
|
||||||
|
task.fork();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean result = true;
|
||||||
|
result &= deleteAction(new Action<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean act() throws IOException {
|
||||||
|
return checkAndDeleteFiles(files);
|
||||||
|
}
|
||||||
|
}, "files");
|
||||||
|
result &= deleteAction(new Action<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean act() throws IOException {
|
||||||
|
return getCleanResult(tasks);
|
||||||
|
}
|
||||||
|
}, "subdirs");
|
||||||
|
// if and only if files and subdirs under current dir are deleted successfully, and
|
||||||
|
// it is not the root dir, then task will try to delete it.
|
||||||
|
if (result && !root) {
|
||||||
|
result &= deleteAction(new Action<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean act() throws IOException {
|
||||||
|
return fs.delete(dir, false);
|
||||||
|
}
|
||||||
|
}, "dir");
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform a delete on a specified type.
|
||||||
|
* @param deletion a delete
|
||||||
|
* @param type possible values are 'files', 'subdirs', 'dirs'
|
||||||
|
* @return true if it deleted successfully, false otherwise
|
||||||
|
*/
|
||||||
|
private boolean deleteAction(Action<Boolean> deletion, String type) {
|
||||||
|
boolean deleted;
|
||||||
|
try {
|
||||||
|
LOG.trace("Start deleting " + type + " under " + dir);
|
||||||
|
deleted = deletion.act();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Could not delete " + type + " under " + dir, ioe);
|
||||||
|
deleted = false;
|
||||||
|
}
|
||||||
|
LOG.trace("Finish deleting " + type + " under " + dir + " deleted=" + deleted);
|
||||||
|
return deleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get cleaner results of subdirs.
|
||||||
|
* @param tasks subdirs cleaner tasks
|
||||||
|
* @return true if all subdirs deleted successfully, false for patial/all failures
|
||||||
|
* @throws IOException something happen during computation
|
||||||
|
*/
|
||||||
|
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
|
||||||
|
boolean cleaned = true;
|
||||||
|
try {
|
||||||
|
for (CleanerTask task : tasks) {
|
||||||
|
cleaned &= task.get();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
return cleaned;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,17 @@ package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -37,6 +46,12 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||||
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
|
private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
|
||||||
|
|
||||||
|
public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size";
|
||||||
|
public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2;
|
||||||
|
|
||||||
|
private final LinkedBlockingQueue<CleanerContext> pendingDelete;
|
||||||
|
private List<Thread> oldWALsCleaner;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param p the period of time to sleep between each run
|
* @param p the period of time to sleep between each run
|
||||||
* @param s the stopper
|
* @param s the stopper
|
||||||
|
@ -47,10 +62,176 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
public LogCleaner(final int p, final Stoppable s, Configuration conf, FileSystem fs,
|
public LogCleaner(final int p, final Stoppable s, Configuration conf, FileSystem fs,
|
||||||
Path oldLogDir) {
|
Path oldLogDir) {
|
||||||
super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
|
super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
|
||||||
|
this.pendingDelete = new LinkedBlockingQueue<>();
|
||||||
|
int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
|
||||||
|
this.oldWALsCleaner = createOldWalsCleaner(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean validate(Path file) {
|
protected boolean validate(Path file) {
|
||||||
return DefaultWALProvider.validateWALFilename(file.getName());
|
return DefaultWALProvider.validateWALFilename(file.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
super.onConfigurationChange(conf);
|
||||||
|
|
||||||
|
int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
|
||||||
|
if (newSize == oldWALsCleaner.size()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Size from configuration is the same as previous which is " +
|
||||||
|
newSize + ", no need to update.");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
interruptOldWALsCleaner();
|
||||||
|
oldWALsCleaner = createOldWalsCleaner(newSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
|
||||||
|
List<CleanerContext> results = new LinkedList<>();
|
||||||
|
for (FileStatus toDelete : filesToDelete) {
|
||||||
|
CleanerContext context = CleanerContext.createCleanerContext(toDelete);
|
||||||
|
if (context != null) {
|
||||||
|
pendingDelete.add(context);
|
||||||
|
results.add(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int deletedFiles = 0;
|
||||||
|
for (CleanerContext res : results) {
|
||||||
|
deletedFiles += res.getResult(500) ? 1 : 0;
|
||||||
|
}
|
||||||
|
return deletedFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void cleanup() {
|
||||||
|
super.cleanup();
|
||||||
|
interruptOldWALsCleaner();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getSizeOfCleaners() {
|
||||||
|
return oldWALsCleaner.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Thread> createOldWalsCleaner(int size) {
|
||||||
|
LOG.info("Creating OldWALs cleaners with size=" + size);
|
||||||
|
|
||||||
|
List<Thread> oldWALsCleaner = new ArrayList<>(size);
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
Thread cleaner = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
deleteFile();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cleaner.setName("OldWALsCleaner-" + i);
|
||||||
|
cleaner.setDaemon(true);
|
||||||
|
cleaner.start();
|
||||||
|
oldWALsCleaner.add(cleaner);
|
||||||
|
}
|
||||||
|
return oldWALsCleaner;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void interruptOldWALsCleaner() {
|
||||||
|
for (Thread cleaner : oldWALsCleaner) {
|
||||||
|
cleaner.interrupt();
|
||||||
|
}
|
||||||
|
oldWALsCleaner.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFile() {
|
||||||
|
while (true) {
|
||||||
|
CleanerContext context = null;
|
||||||
|
boolean succeed = false;
|
||||||
|
boolean interrupted = false;
|
||||||
|
try {
|
||||||
|
context = pendingDelete.take();
|
||||||
|
if (context != null) {
|
||||||
|
FileStatus toClean = context.getTargetToClean();
|
||||||
|
succeed = this.fs.delete(toClean.getPath(), false);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ite) {
|
||||||
|
// It's most likely from configuration changing request
|
||||||
|
if (context != null) {
|
||||||
|
LOG.warn("Interrupted while cleaning oldWALs " +
|
||||||
|
context.getTargetToClean() + ", try to clean it next round.");
|
||||||
|
}
|
||||||
|
interrupted = true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
// fs.delete() fails.
|
||||||
|
LOG.warn("Failed to clean oldwals with exception: " + e);
|
||||||
|
succeed = false;
|
||||||
|
} finally {
|
||||||
|
if (context != null) {
|
||||||
|
context.setResult(succeed);
|
||||||
|
}
|
||||||
|
if (interrupted) {
|
||||||
|
// Restore interrupt status
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Exiting cleaner.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void cancel(boolean mayInterruptIfRunning) {
|
||||||
|
super.cancel(mayInterruptIfRunning);
|
||||||
|
for (Thread t : oldWALsCleaner) {
|
||||||
|
t.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class CleanerContext {
|
||||||
|
// At most waits 60 seconds
|
||||||
|
static final long MAX_WAIT = 60 * 1000;
|
||||||
|
|
||||||
|
final FileStatus target;
|
||||||
|
volatile boolean result;
|
||||||
|
volatile boolean setFromCleaner = false;
|
||||||
|
|
||||||
|
static CleanerContext createCleanerContext(FileStatus status) {
|
||||||
|
return status != null ? new CleanerContext(status) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CleanerContext(FileStatus status) {
|
||||||
|
this.target = status;
|
||||||
|
this.result = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void setResult(boolean res) {
|
||||||
|
this.result = res;
|
||||||
|
this.setFromCleaner = true;
|
||||||
|
notify();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized boolean getResult(long waitIfNotFinished) {
|
||||||
|
long totalTime = 0;
|
||||||
|
try {
|
||||||
|
while (!setFromCleaner) {
|
||||||
|
wait(waitIfNotFinished);
|
||||||
|
totalTime += waitIfNotFinished;
|
||||||
|
if (totalTime >= MAX_WAIT) {
|
||||||
|
LOG.warn("Spend too much time to delete oldwals " + target);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("Interrupted while waiting deletion of " + target);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
FileStatus getTargetToClean() {
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ChoreService;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
|
@ -170,6 +171,7 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
Configuration conf = UTIL.getConfiguration();
|
Configuration conf = UTIL.getConfiguration();
|
||||||
// setup the delegate
|
// setup the delegate
|
||||||
Stoppable stop = new StoppableImplementation();
|
Stoppable stop = new StoppableImplementation();
|
||||||
|
CleanerChore.initChorePool(conf);
|
||||||
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
||||||
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
||||||
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
||||||
|
@ -224,6 +226,7 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
// setup the delegate
|
// setup the delegate
|
||||||
Stoppable stop = new StoppableImplementation();
|
Stoppable stop = new StoppableImplementation();
|
||||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||||
|
CleanerChore.initChorePool(conf);
|
||||||
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
||||||
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
||||||
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
||||||
|
|
|
@ -17,14 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -34,6 +37,7 @@ import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -46,6 +50,11 @@ public class TestCleanerChore {
|
||||||
private static final Log LOG = LogFactory.getLog(TestCleanerChore.class);
|
private static final Log LOG = LogFactory.getLog(TestCleanerChore.class);
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
CleanerChore.initChorePool(UTIL.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void cleanup() throws Exception {
|
public void cleanup() throws Exception {
|
||||||
// delete and recreate the test directory, ensuring a clean test dir between tests
|
// delete and recreate the test directory, ensuring a clean test dir between tests
|
||||||
|
@ -275,11 +284,8 @@ public class TestCleanerChore {
|
||||||
}
|
}
|
||||||
}).when(spy).isFileDeletable(Mockito.any(FileStatus.class));
|
}).when(spy).isFileDeletable(Mockito.any(FileStatus.class));
|
||||||
|
|
||||||
// attempt to delete the directory, which
|
// run the chore
|
||||||
if (chore.checkAndDeleteDirectory(parent)) {
|
chore.chore();
|
||||||
throw new Exception(
|
|
||||||
"Reported success deleting directory, should have failed when adding file mid-iteration");
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure all the directories + added file exist, but the original file is deleted
|
// make sure all the directories + added file exist, but the original file is deleted
|
||||||
assertTrue("Added file unexpectedly deleted", fs.exists(racyFile));
|
assertTrue("Added file unexpectedly deleted", fs.exists(racyFile));
|
||||||
|
@ -354,6 +360,89 @@ public class TestCleanerChore {
|
||||||
assertTrue("Directory got deleted", fs.exists(parent));
|
assertTrue("Directory got deleted", fs.exists(parent));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnConfigurationChange() throws Exception {
|
||||||
|
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
|
||||||
|
if (availableProcessorNum == 1) { // no need to run this test
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// have at least 2 available processors/cores
|
||||||
|
int initPoolSize = availableProcessorNum / 2;
|
||||||
|
int changedPoolSize = availableProcessorNum;
|
||||||
|
|
||||||
|
Stoppable stop = new StoppableImplementation();
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
Path testDir = UTIL.getDataTestDir();
|
||||||
|
FileSystem fs = UTIL.getTestFileSystem();
|
||||||
|
String confKey = "hbase.test.cleaner.delegates";
|
||||||
|
conf.set(confKey, AlwaysDelete.class.getName());
|
||||||
|
conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(initPoolSize));
|
||||||
|
final AllValidPaths chore =
|
||||||
|
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey);
|
||||||
|
chore.setEnabled(true);
|
||||||
|
// Create subdirs under testDir
|
||||||
|
int dirNums = 6;
|
||||||
|
Path[] subdirs = new Path[dirNums];
|
||||||
|
for (int i = 0; i < dirNums; i++) {
|
||||||
|
subdirs[i] = new Path(testDir, "subdir-" + i);
|
||||||
|
fs.mkdirs(subdirs[i]);
|
||||||
|
}
|
||||||
|
// Under each subdirs create 6 files
|
||||||
|
for (Path subdir : subdirs) {
|
||||||
|
createFiles(fs, subdir, 6);
|
||||||
|
}
|
||||||
|
// Start chore
|
||||||
|
Thread t = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
chore.chore();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
t.setDaemon(true);
|
||||||
|
t.start();
|
||||||
|
// Change size of chore's pool
|
||||||
|
conf.set(CleanerChore.CHORE_POOL_SIZE, String.valueOf(changedPoolSize));
|
||||||
|
chore.onConfigurationChange(conf);
|
||||||
|
assertEquals(changedPoolSize, chore.getChorePoolSize());
|
||||||
|
// Stop chore
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMinimumNumberOfThreads() throws Exception {
|
||||||
|
Stoppable stop = new StoppableImplementation();
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
Path testDir = UTIL.getDataTestDir();
|
||||||
|
FileSystem fs = UTIL.getTestFileSystem();
|
||||||
|
String confKey = "hbase.test.cleaner.delegates";
|
||||||
|
conf.set(confKey, AlwaysDelete.class.getName());
|
||||||
|
conf.set(CleanerChore.CHORE_POOL_SIZE, "2");
|
||||||
|
AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey);
|
||||||
|
int numProcs = Runtime.getRuntime().availableProcessors();
|
||||||
|
// Sanity
|
||||||
|
assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs)));
|
||||||
|
// The implementation does not allow us to set more threads than we have processors
|
||||||
|
assertEquals(numProcs, chore.calculatePoolSize(Integer.toString(numProcs + 2)));
|
||||||
|
// Force us into the branch that is multiplying 0.0 against the number of processors
|
||||||
|
assertEquals(1, chore.calculatePoolSize("0.0"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
|
||||||
|
Random random = new Random();
|
||||||
|
for (int i = 0; i < numOfFiles; i++) {
|
||||||
|
int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
|
||||||
|
try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
|
||||||
|
for (int m = 0; m < xMega; m++) {
|
||||||
|
byte[] M = new byte[1024 * 1024];
|
||||||
|
random.nextBytes(M);
|
||||||
|
fsdos.write(M);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> {
|
private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
|
|
||||||
public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs,
|
public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs,
|
||||||
|
|
|
@ -56,6 +56,7 @@ public class TestHFileCleaner {
|
||||||
public static void setupCluster() throws Exception {
|
public static void setupCluster() throws Exception {
|
||||||
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
|
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
|
||||||
UTIL.startMiniDFSCluster(1);
|
UTIL.startMiniDFSCluster(1);
|
||||||
|
CleanerChore.initChorePool(UTIL.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
|
|
@ -96,6 +96,7 @@ public class TestHFileLinkCleaner {
|
||||||
final long ttl = 1000;
|
final long ttl = 1000;
|
||||||
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
|
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
|
||||||
Server server = new DummyServer();
|
Server server = new DummyServer();
|
||||||
|
CleanerChore.initChorePool(conf);
|
||||||
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir);
|
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir);
|
||||||
|
|
||||||
// Link backref cannot be removed
|
// Link backref cannot be removed
|
||||||
|
|
|
@ -30,10 +30,12 @@ import java.net.URLEncoder;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -76,6 +78,8 @@ public class TestLogsCleaner {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL.startMiniZKCluster();
|
TEST_UTIL.startMiniZKCluster();
|
||||||
|
TEST_UTIL.startMiniDFSCluster(1);
|
||||||
|
CleanerChore.initChorePool(TEST_UTIL.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,6 +88,7 @@ public class TestLogsCleaner {
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
TEST_UTIL.shutdownMiniZKCluster();
|
TEST_UTIL.shutdownMiniZKCluster();
|
||||||
|
TEST_UTIL.shutdownMiniDFSCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -253,6 +258,58 @@ public class TestLogsCleaner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnConfigurationChange() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE);
|
||||||
|
// Prepare environments
|
||||||
|
Server server = new DummyServer();
|
||||||
|
Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
|
||||||
|
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
|
||||||
|
assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners());
|
||||||
|
// Create dir and files for test
|
||||||
|
fs.delete(oldWALsDir, true);
|
||||||
|
fs.mkdirs(oldWALsDir);
|
||||||
|
int numOfFiles = 10;
|
||||||
|
createFiles(fs, oldWALsDir, numOfFiles);
|
||||||
|
FileStatus[] status = fs.listStatus(oldWALsDir);
|
||||||
|
assertEquals(numOfFiles, status.length);
|
||||||
|
// Start cleaner chore
|
||||||
|
Thread thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
cleaner.chore();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.start();
|
||||||
|
// change size of cleaners dynamically
|
||||||
|
int sizeToChange = 4;
|
||||||
|
conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange);
|
||||||
|
cleaner.onConfigurationChange(conf);
|
||||||
|
assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
|
||||||
|
// Stop chore
|
||||||
|
thread.join();
|
||||||
|
status = fs.listStatus(oldWALsDir);
|
||||||
|
assertEquals(0, status.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException {
|
||||||
|
Random random = new Random();
|
||||||
|
for (int i = 0; i < numOfFiles; i++) {
|
||||||
|
int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
|
||||||
|
try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
|
||||||
|
for (int m = 0; m < xMega; m++) {
|
||||||
|
byte[] M = new byte[1024 * 1024];
|
||||||
|
random.nextBytes(M);
|
||||||
|
fsdos.write(M);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class DummyServer implements Server {
|
static class DummyServer implements Server {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue