diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index fcaf55fc783..cde35816284 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -908,6 +908,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.masterFinishedInitializationTime = System.currentTimeMillis(); configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.hfileCleaner); + configurationManager.registerObserver(this.logCleaner); // Set master as 'initialized'. setInitialized(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index b8ca1ecfc2e..582df8436a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -17,18 +17,22 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import java.io.IOException; -import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -46,16 +50,33 @@ import org.apache.hadoop.ipc.RemoteException; * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param Cleaner delegate class that is dynamically loaded from configuration */ -public abstract class CleanerChore extends ScheduledChore { +public abstract class CleanerChore extends ScheduledChore + implements ConfigurationObserver { private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); + private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); + + /** + * If it is an integer and >= 1, it would be the size; + * if 0.0 < size <= 1.0, size would be available processors * size. + * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores, + * while latter will use only 1 thread for chore to scan dir. + */ + public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; + private static final String DEFAULT_CHORE_POOL_SIZE = "0.5"; + + // It may be waste resources for each cleaner chore own its pool, + // so let's make pool for all cleaner chores. + private static volatile ForkJoinPool chorePool; + private static volatile int chorePoolSize; protected final FileSystem fs; private final Path oldFileDir; private final Configuration conf; + protected final Map params; + private final AtomicBoolean enabled = new AtomicBoolean(true); + private final AtomicBoolean reconfig = new AtomicBoolean(false); protected List cleanersChain; - protected Map params; - private AtomicBoolean enabled = new AtomicBoolean(true); public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { @@ -80,8 +101,42 @@ public abstract class CleanerChore extends Schedu this.conf = conf; this.params = params; initCleanerChain(confKey); + + if (chorePool == null) { + String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); + chorePoolSize = calculatePoolSize(poolSize); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + chorePoolSize = chorePoolSize == 0 ? + calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : chorePoolSize; + this.chorePool = new ForkJoinPool(chorePoolSize); + LOG.info("Cleaner pool size is " + chorePoolSize); + } } + /** + * Calculate size for cleaner pool. + * @param poolSize size from configuration + * @return size of pool after calculation + */ + private int calculatePoolSize(String poolSize) { + if (poolSize.matches("[1-9][0-9]*")) { + // If poolSize is an integer, return it directly, + // but upmost to the number of available processors. + int size = Math.min(Integer.valueOf(poolSize), AVAIL_PROCESSORS); + if (size == AVAIL_PROCESSORS) { + LOG.warn("Use full core processors to scan dir"); + } + return size; + } else if (poolSize.matches("0.[0-9]+|1.0")) { + // if poolSize is a double, return poolSize * availableProcessors; + return (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize)); + } else { + LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE + + ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead."); + return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE); + } + } /** * Validate the file to see if it even belongs in the directory. If it is valid, then the file @@ -109,6 +164,33 @@ public abstract class CleanerChore extends Schedu } } + @Override + public void onConfigurationChange(Configuration conf) { + int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); + if (updatedSize == chorePoolSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Size from configuration is the same as previous which is " + + updatedSize + ", no need to update."); + } + return; + } + chorePoolSize = updatedSize; + if (chorePool.getPoolSize() == 0) { + // Chore does not work now, update it directly. + updateChorePoolSize(updatedSize); + return; + } + // Chore is working, update it after chore finished. + reconfig.set(true); + } + + private void updateChorePoolSize(int updatedSize) { + chorePool.shutdownNow(); + LOG.info("Update chore's pool size from " + + chorePool.getParallelism() + " to " + updatedSize); + chorePool = new ForkJoinPool(updatedSize); + } + /** * A utility method to create new instances of LogCleanerDelegate based on the class name of the * LogCleanerDelegate. @@ -135,7 +217,17 @@ public abstract class CleanerChore extends Schedu @Override protected void chore() { if (getEnabled()) { - runCleaner(); + if (runCleaner()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaned old files/dirs under " + oldFileDir + " successfully."); + } + } else { + LOG.warn("Failed to fully clean old files/dirs under " + oldFileDir + "."); + } + // After each clean chore, checks if receives reconfigure notification while cleaning + if (reconfig.compareAndSet(true, false)) { + updateChorePoolSize(chorePoolSize); + } } else { LOG.debug("Cleaner chore disabled! Not cleaning."); } @@ -147,16 +239,9 @@ public abstract class CleanerChore extends Schedu public Boolean runCleaner() { preRunCleaner(); - try { - FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir); - checkAndDeleteEntries(files); - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.warn("Error while cleaning the logs", e); - return false; - } - return true; + CleanerTask task = new CleanerTask(this.oldFileDir, true); + chorePool.submit(task); + return task.join(); } /** @@ -198,89 +283,6 @@ public abstract class CleanerChore extends Schedu }); } - /** - * Loop over the given directory entries, and check whether they can be deleted. - * If an entry is itself a directory it will be recursively checked and deleted itself iff - * all subentries are deleted (and no new subentries are added in the mean time) - * - * @param entries directory entries to check - * @return true if all entries were successfully deleted - */ - private boolean checkAndDeleteEntries(FileStatus[] entries) { - if (entries == null) { - return true; - } - boolean allEntriesDeleted = true; - List files = Lists.newArrayListWithCapacity(entries.length); - List dirs = new ArrayList<>(); - for (FileStatus child : entries) { - if (child.isDirectory()) { - dirs.add(child); - } else { - // collect all files to attempt to delete in one batch - files.add(child); - } - } - if (dirs.size() > 0) { - sortByConsumedSpace(dirs); - LOG.debug("Prepared to delete files in directories: " + dirs); - for (FileStatus child : dirs) { - Path path = child.getPath(); - // for each subdirectory delete it and all entries if possible - if (!checkAndDeleteDirectory(path)) { - allEntriesDeleted = false; - } - } - } - if (!checkAndDeleteFiles(files)) { - allEntriesDeleted = false; - } - return allEntriesDeleted; - } - - /** - * Attempt to delete a directory and all files under that directory. Each child file is passed - * through the delegates to see if it can be deleted. If the directory has no children when the - * cleaners have finished it is deleted. - *

- * If new children files are added between checks of the directory, the directory will not - * be deleted. - * @param dir directory to check - * @return true if the directory was deleted, false otherwise. - */ - @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) { - if (LOG.isTraceEnabled()) { - LOG.trace("Checking directory: " + dir); - } - - try { - FileStatus[] children = FSUtils.listStatus(fs, dir); - boolean allChildrenDeleted = checkAndDeleteEntries(children); - - // if the directory still has children, we can't delete it, so we are done - if (!allChildrenDeleted) return false; - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.warn("Error while listing directory: " + dir, e); - // couldn't list directory, so don't try to delete, and don't return success - return false; - } - - // otherwise, all the children (that we know about) have been deleted, so we should try to - // delete this directory. However, don't do so recursively so we don't delete files that have - // been added since we last checked. - try { - return fs.delete(dir, false); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + dir, e); - } - // couldn't delete w/o exception, so we can't return success. - return false; - } - } - /** * Run the given files through each of the cleaners to see if it should be deleted, deleting it if * necessary. @@ -288,6 +290,10 @@ public abstract class CleanerChore extends Schedu * @return true iff successfully deleted all files */ private boolean checkAndDeleteFiles(List files) { + if (files == null) { + return true; + } + // first check to see if the path is valid List validFiles = Lists.newArrayListWithCapacity(files.size()); List invalidFiles = Lists.newArrayList(); @@ -368,6 +374,11 @@ public abstract class CleanerChore extends Schedu } } + @VisibleForTesting + int getChorePoolSize() { + return chorePoolSize; + } + /** * @param enabled */ @@ -378,4 +389,132 @@ public abstract class CleanerChore extends Schedu public boolean getEnabled() { return this.enabled.get(); } + + private interface Action { + T act() throws IOException; + } + + private class CleanerTask extends RecursiveTask { + private final Path dir; + private final boolean root; + + CleanerTask(final FileStatus dir, final boolean root) { + this(dir.getPath(), root); + } + + CleanerTask(final Path dir, final boolean root) { + this.dir = dir; + this.root = root; + } + + @Override + protected Boolean compute() { + if (LOG.isDebugEnabled()) { + LOG.debug("CleanerTask " + Thread.currentThread().getId() + + " starts cleaning dirs and files under " + dir + " and itself."); + } + + List subDirs; + List files; + try { + subDirs = getFilteredStatus(status -> status.isDirectory()); + files = getFilteredStatus(status -> status.isFile()); + } catch (IOException ioe) { + LOG.warn(dir + " doesn't exist, just skip it. ", ioe); + return true; + } + + boolean nullSubDirs = subDirs == null; + if (nullSubDirs) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no subdir under " + dir); + } + } + if (files == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no file under " + dir); + } + } + + int capacity = nullSubDirs ? 0 : subDirs.size(); + List tasks = Lists.newArrayListWithCapacity(capacity); + if (!nullSubDirs) { + sortByConsumedSpace(subDirs); + for (FileStatus subdir : subDirs) { + CleanerTask task = new CleanerTask(subdir, false); + tasks.add(task); + task.fork(); + } + } + + boolean result = true; + result &= deleteAction(() -> checkAndDeleteFiles(files), "files"); + result &= deleteAction(() -> getCleanRusult(tasks), "subdirs"); + // if and only if files and subdirs under current dir are deleted successfully, and + // it is not the root dir, then task will try to delete it. + if (result && !root) { + result &= deleteAction(() -> fs.delete(dir, false), "dir"); + } + return result; + } + + /** + * Get FileStatus with filter. + * Pay attention that FSUtils #listStatusWithStatusFilter would return null, + * even though status is empty but not null. + * @param function a filter function + * @return filtered FileStatus + * @throws IOException if there's no such a directory + */ + private List getFilteredStatus(Predicate function) throws IOException { + return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status)); + } + + /** + * Perform a delete on a specified type. + * @param deletion a delete + * @param type possible values are 'files', 'subdirs', 'dirs' + * @return true if it deleted successfully, false otherwise + */ + private boolean deleteAction(Action deletion, String type) { + boolean deleted; + String errorMsg = ""; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Start deleting " + type + " under " + dir); + } + deleted = deletion.act(); + } catch (IOException ioe) { + errorMsg = ioe.getMessage(); + deleted = false; + } + if (LOG.isDebugEnabled()) { + if (deleted) { + LOG.debug("Finish deleting " + type + " under " + dir); + } else { + LOG.debug("Couldn't delete " + type + " completely under " + dir + + " with reasons: " + (!errorMsg.equals("") ? errorMsg : " undeletable, please check.")); + } + } + return deleted; + } + + /** + * Get cleaner results of subdirs. + * @param tasks subdirs cleaner tasks + * @return true if all subdirs deleted successfully, false for patial/all failures + * @throws IOException something happen during computation + */ + private boolean getCleanRusult(List tasks) throws IOException { + boolean cleaned = true; + try { + for (CleanerTask task : tasks) { + cleaned &= task.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } + return cleaned; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index e91d4f1f2b2..5c78dc498e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -32,11 +32,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.StealJobQueue; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** @@ -44,8 +43,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe * folder that are deletable for each HFile cleaner in the chain. */ @InterfaceAudience.Private -public class HFileCleaner extends CleanerChore implements - ConfigurationObserver { +public class HFileCleaner extends CleanerChore { public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; @@ -390,6 +388,8 @@ public class HFileCleaner extends CleanerChore impleme @Override public void onConfigurationChange(Configuration conf) { + super.onConfigurationChange(conf); + if (!checkAndUpdateConfigurations(conf)) { LOG.debug("Update configuration triggered but nothing changed for this cleaner"); return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index 8569cb5bc1a..3cb620e195e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -19,15 +19,24 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in the old @@ -38,6 +47,12 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; public class LogCleaner extends CleanerChore { private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName()); + public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size"; + public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2; + + private final LinkedBlockingQueue pendingDelete; + private List oldWALsCleaner; + /** * @param period the period of time to sleep between each run * @param stopper the stopper @@ -48,6 +63,9 @@ public class LogCleaner extends CleanerChore { public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, Path oldLogDir) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); + this.pendingDelete = new LinkedBlockingQueue<>(); + int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + this.oldWALsCleaner = createOldWalsCleaner(size); } @Override @@ -55,4 +73,152 @@ public class LogCleaner extends CleanerChore { return AbstractFSWALProvider.validateWALFilename(file.getName()) || MasterProcedureUtil.validateProcedureWALFilename(file.getName()); } + + @Override + public void onConfigurationChange(Configuration conf) { + super.onConfigurationChange(conf); + + int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + if (newSize == oldWALsCleaner.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Size from configuration is the same as previous which is " + + newSize + ", no need to update."); + } + return; + } + interruptOldWALsCleaner(); + oldWALsCleaner = createOldWalsCleaner(newSize); + } + + @Override + protected int deleteFiles(Iterable filesToDelete) { + List results = new LinkedList<>(); + for (FileStatus toDelete : filesToDelete) { + CleanerContext context = CleanerContext.createCleanerContext(toDelete); + if (context != null) { + pendingDelete.add(context); + results.add(context); + } + } + + int deletedFiles = 0; + for (CleanerContext res : results) { + deletedFiles += res.getResult(500) ? 1 : 0; + } + return deletedFiles; + } + + @Override + public void cleanup() { + super.cleanup(); + interruptOldWALsCleaner(); + } + + @VisibleForTesting + int getSizeOfCleaners() { + return oldWALsCleaner.size(); + } + + private List createOldWalsCleaner(int size) { + LOG.info("Creating OldWALs cleaners with size: " + size); + + List oldWALsCleaner = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Thread cleaner = new Thread(() -> deleteFile()); + cleaner.setName("OldWALsCleaner-" + i); + cleaner.setDaemon(true); + cleaner.start(); + oldWALsCleaner.add(cleaner); + } + return oldWALsCleaner; + } + + private void interruptOldWALsCleaner() { + for (Thread cleaner : oldWALsCleaner) { + cleaner.interrupt(); + } + oldWALsCleaner.clear(); + } + + private void deleteFile() { + while (true) { + CleanerContext context = null; + boolean succeed = false; + boolean interrupted = false; + try { + context = pendingDelete.take(); + if (context != null) { + FileStatus toClean = context.getTargetToClean(); + succeed = this.fs.delete(toClean.getPath(), false); + } + } catch (InterruptedException ite) { + // It's most likely from configuration changing request + if (context != null) { + LOG.warn("Interrupted while cleaning oldWALs " + + context.getTargetToClean() + ", try to clean it next round."); + } + interrupted = true; + } catch (IOException e) { + // fs.delete() fails. + LOG.warn("Failed to clean oldwals with exception: " + e); + succeed = false; + } finally { + context.setResult(succeed); + if (interrupted) { + // Restore interrupt status + Thread.currentThread().interrupt(); + break; + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Exiting cleaner."); + } + } + + private static final class CleanerContext { + // At most waits 60 seconds + static final long MAX_WAIT = 60 * 1000; + + final FileStatus target; + volatile boolean result; + volatile boolean setFromCleaner = false; + + static CleanerContext createCleanerContext(FileStatus status) { + return status != null ? new CleanerContext(status) : null; + } + + private CleanerContext(FileStatus status) { + this.target = status; + this.result = false; + } + + synchronized void setResult(boolean res) { + this.result = res; + this.setFromCleaner = true; + notify(); + } + + synchronized boolean getResult(long waitIfNotFinished) { + long totalTime = 0; + try { + while (!setFromCleaner) { + wait(waitIfNotFinished); + totalTime += waitIfNotFinished; + if (totalTime >= MAX_WAIT) { + LOG.warn("Spend too much time to delete oldwals " + target); + return result; + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting deletion of " + target); + return result; + } + return result; + } + + FileStatus getTargetToClean() { + return target; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 566479a6dee..39bdbc7e6aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,8 +54,7 @@ public class TestCleanerChore { public void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests UTIL.cleanupTestDir(); -} - + } @Test public void testSavesFilesOnRequest() throws Exception { @@ -276,11 +278,8 @@ public class TestCleanerChore { } }).when(spy).isFileDeletable(Mockito.any()); - // attempt to delete the directory, which - if (chore.checkAndDeleteDirectory(parent)) { - throw new Exception( - "Reported success deleting directory, should have failed when adding file mid-iteration"); - } + // run the chore + chore.chore(); // make sure all the directories + added file exist, but the original file is deleted assertTrue("Added file unexpectedly deleted", fs.exists(racyFile)); @@ -355,6 +354,54 @@ public class TestCleanerChore { assertTrue("Directory got deleted", fs.exists(parent)); } + @Test + public void testOnConfigurationChange() throws Exception { + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + conf.set(CleanerChore.CHORE_POOL_SIZE, "2"); + AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey); + chore.setEnabled(true); + // Create subdirs under testDir + int dirNums = 6; + Path[] subdirs = new Path[dirNums]; + for (int i = 0; i < dirNums; i++) { + subdirs[i] = new Path(testDir, "subdir-" + i); + fs.mkdirs(subdirs[i]); + } + // Under each subdirs create 6 files + for (Path subdir : subdirs) { + createFiles(fs, subdir, 6); + } + // Start chore + Thread t = new Thread(() -> chore.chore()); + t.setDaemon(true); + t.start(); + // Change size of chore's pool + conf.set(CleanerChore.CHORE_POOL_SIZE, "4"); + chore.onConfigurationChange(conf); + assertEquals(4, chore.getChorePoolSize()); + // Stop chore + t.join(); + } + + private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { + Random random = new Random(); + for (int i = 0; i < numOfFiles; i++) { + int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M + try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { + for (int m = 0; m < xMega; m++) { + byte[] M = new byte[1024 * 1024]; + random.nextBytes(M); + fsdos.write(M); + } + } + } + } + private static class AllValidPaths extends CleanerChore { public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 98176feed3e..34e81db4284 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -29,9 +29,11 @@ import java.net.URLEncoder; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -76,11 +78,13 @@ public class TestLogsCleaner { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniDFSCluster(1); } @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); } /** @@ -248,6 +252,53 @@ public class TestLogsCleaner { } } + @Test + public void testOnConfigurationChange() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE); + // Prepare environments + Server server = new DummyServer(); + Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), + HConstants.HREGION_OLDLOGDIR_NAME); + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); + assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners()); + // Create dir and files for test + fs.delete(oldWALsDir, true); + fs.mkdirs(oldWALsDir); + int numOfFiles = 10; + createFiles(fs, oldWALsDir, numOfFiles); + FileStatus[] status = fs.listStatus(oldWALsDir); + assertEquals(numOfFiles, status.length); + // Start cleaner chore + Thread thread = new Thread(() -> cleaner.chore()); + thread.setDaemon(true); + thread.start(); + // change size of cleaners dynamically + int sizeToChange = 4; + conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange); + cleaner.onConfigurationChange(conf); + assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); + // Stop chore + thread.join(); + status = fs.listStatus(oldWALsDir); + assertEquals(0, status.length); + } + + private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException { + Random random = new Random(); + for (int i = 0; i < numOfFiles; i++) { + int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M + try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) { + for (int m = 0; m < xMega; m++) { + byte[] M = new byte[1024 * 1024]; + random.nextBytes(M); + fsdos.write(M); + } + } + } + } + static class DummyServer implements Server { @Override