diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index db0e897ab22..78be50bd8ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -18,13 +18,13 @@ package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,8 +36,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.FileStatusFilter; import org.apache.hadoop.ipc.RemoteException; import com.google.common.annotations.VisibleForTesting; @@ -207,10 +205,20 @@ public abstract class CleanerChore extends Schedu } } - public Boolean runCleaner() { - CleanerTask task = new CleanerTask(this.oldFileDir, true); - pool.execute(task); - return task.join(); + public boolean runCleaner() { + try { + final AsyncResult result = new AsyncResult(); + pool.execute(new Runnable() { + @Override + public void run() { + traverseAndDelete(oldFileDir, true, result); + } + }); + return result.get(); + } catch (Exception e) { + LOG.info("Failed to traverse and delete paths under the dir: " + oldFileDir, e); + return false; + } } /** @@ -322,140 +330,171 @@ public abstract class CleanerChore extends Schedu T act() throws IOException; } - /** - * Attemps to clean up a directory, its subdirectories, and files. Return value is true if - * everything was deleted. false on partial / total failures. - */ - private final class CleanerTask extends RecursiveTask { - private static final long serialVersionUID = -1584635903138015418L; - private final Path dir; - private final boolean root; + private interface Callback { + void run(T val); + } - CleanerTask(final FileStatus dir, final boolean root) { - this(dir.getPath(), root); + private final class AsyncResult { + + private Callback callback; + private T result; + private boolean resultSet = false; + + AsyncResult(Callback callback) { + this.callback = callback; } - CleanerTask(final Path dir, final boolean root) { - this.dir = dir; - this.root = root; + AsyncResult() { } - @Override - protected Boolean compute() { - LOG.trace("Cleaning under " + dir); - List subDirs; - List tmpFiles; - final List files; - try { - // if dir doesn't exist, we'll get null back for both of these - // which will fall through to succeeding. - subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() { - @Override - public boolean accept(FileStatus f) { - return f.isDirectory(); - } - }); - if (subDirs == null) { - subDirs = Collections.emptyList(); + void set(T result) { + synchronized (this) { + this.result = result; + if (callback != null) { + callback.run(result); } - tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() { - @Override - public boolean accept(FileStatus f) { - return f.isFile(); - } - }); - files = tmpFiles == null ? Collections.emptyList() : tmpFiles; - } catch (IOException ioe) { - LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe); - return false; + // Mark the result set process finished and notify the waiting get method. + this.resultSet = true; + this.notifyAll(); } + } - boolean allFilesDeleted = true; - if (!files.isEmpty()) { - allFilesDeleted = deleteAction(new Action() { - @Override - public Boolean act() throws IOException { - return checkAndDeleteFiles(files); - } - }, "files"); - } - - boolean allSubdirsDeleted = true; - if (!subDirs.isEmpty()) { - final List tasks = Lists.newArrayListWithCapacity(subDirs.size()); - for (FileStatus subdir : subDirs) { - CleanerTask task = new CleanerTask(subdir, false); - tasks.add(task); - task.fork(); - } - allSubdirsDeleted = deleteAction(new Action() { - @Override - public Boolean act() throws IOException { - return getCleanResult(tasks); - } - }, "subdirs"); - } - - boolean result = allFilesDeleted && allSubdirsDeleted; - // if and only if files and subdirs under current dir are deleted successfully, and - // it is not the root dir, then task will try to delete it. - if (result && !root) { - result &= deleteAction(new Action() { - @Override - public Boolean act() throws IOException { - return fs.delete(dir, false); - } - }, "dir"); + synchronized T get() throws Exception { + while (!resultSet) { + wait(); } return result; } + } - /** - * Perform a delete on a specified type. - * @param deletion a delete - * @param type possible values are 'files', 'subdirs', 'dirs' - * @return true if it deleted successfully, false otherwise - */ - private boolean deleteAction(Action deletion, String type) { - boolean deleted; - try { - LOG.trace("Start deleting " + type + " under " + dir); - deleted = deletion.act(); - } catch (PathIsNotEmptyDirectoryException exception) { - // N.B. HDFS throws this exception when we try to delete a non-empty directory, but - // LocalFileSystem throws a bare IOException. So some test code will get the verbose - // message below. - LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty. Probably transient. " + - "exception details at TRACE."); - LOG.trace("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.", - exception); - deleted = false; - } catch (IOException ioe) { - LOG.info("Could not delete " + type + " under " + dir + ". might be transient; we'll " + - "retry. if it keeps happening, use following exception when asking on mailing list.", - ioe); - deleted = false; - } - LOG.trace("Finish deleting " + type + " under " + dir + " deleted=" + deleted); - return deleted; - } - - /** - * Get cleaner results of subdirs. - * @param tasks subdirs cleaner tasks - * @return true if all subdirs deleted successfully, false for patial/all failures - * @throws IOException something happen during computation - */ - private boolean getCleanResult(List tasks) throws IOException { - boolean cleaned = true; - try { - for (CleanerTask task : tasks) { - cleaned &= task.get(); + /** + * Attempts to clean up a directory(its subdirectories, and files) in a + * {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by + * calling result.get(). + * @param dir means the directory we will start to traverse and delete. + * @param root means whether it's the root directory to traverse, if true then cannot delete it. + * @param result {@link AsyncResult} to fetch the result. True means the current + * directory has been deleted successfully (for root dir we don't need that) and the + * parent will try to delete its own directory if all of the children(files and + * sub-directories are included) has been deleted successfully. + */ + private void traverseAndDelete(final Path dir, final boolean root, + final AsyncResult result) { + try { + final Action curDirDeletion = new Action() { + @Override + public Boolean act() throws IOException { + return fs.delete(dir, false); + } + }; + + // Step.1: List all files under the given directory. + List allPaths = Arrays.asList(fs.listStatus(dir)); + final List subDirs = new ArrayList<>(); + final List files = new ArrayList<>(); + for (FileStatus status : allPaths) { + if (status.isDirectory()) { + subDirs.add(status); + } else if (status.isFile()) { + files.add(status); } - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); } - return cleaned; + + // Step.2: Try to delete all the deletable files. + final boolean allFilesDeleted = files.isEmpty() || deleteAction(new Action() { + @Override + public Boolean act() throws IOException { + return checkAndDeleteFiles(files); + } + }, "files", dir); + + // Step.3: Start to traverse and delete the sub-directories. + if (subDirs.isEmpty()) { + // If no sub-directories, then just try to delete the current dir and finish the result. + boolean deleted = allFilesDeleted; + if (allFilesDeleted && !root) { + deleted = deleteAction(curDirDeletion, "dir", dir); + } + result.set(deleted); + return; + } + + // Otherwise, there should be some sub-directories. then we will register the following + // callback in AsyncResult of sub-directory, and once all of the sub-directories are traversed + // and deleted then the callback will try to delete the current dir and finish the result. + final AtomicInteger remain = new AtomicInteger(subDirs.size()); + Callback callback = new Callback() { + private volatile boolean allSubDirDeleted = true; + + @Override + public void run(Boolean subDirDeleted) { + allSubDirDeleted &= subDirDeleted; + if (remain.decrementAndGet() == 0) { + boolean deleted = allFilesDeleted && allSubDirDeleted; + if (deleted && !root) { + deleted = deleteAction(curDirDeletion, "dir", dir); + } + result.set(deleted); + } + } + }; + + // Submit the request of sub-directory deletion. + for (FileStatus subDir : subDirs) { + final FileStatus finalSubDir = subDir; + // Register the callback in AsyncResult here. + final AsyncResult asyncResult = new AsyncResult(callback); + pool.execute(new Runnable() { + @Override + public void run() { + traverseAndDelete(finalSubDir.getPath(), false, asyncResult); + } + }); + } + } catch (Exception e) { + result.set(false); + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to traverse and delete the path=" + dir + ", root=" + root, e); + } } } + + /** + * Perform a delete on a specified type. + * @param deletion a delete + * @param type possible values are 'files', 'subdirs', 'dirs' + * @param dir delete actions happened under the given directory. + * @return true if it deleted successfully, false otherwise + */ + private boolean deleteAction(Action deletion, String type, Path dir) { + boolean deleted; + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Start deleting " + type + " under " + dir); + } + deleted = deletion.act(); + } catch (PathIsNotEmptyDirectoryException exception) { + // N.B. HDFS throws this exception when we try to delete a non-empty directory, but + // LocalFileSystem throws a bare IOException. So some test code will get the verbose + // message below. + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.", + exception); + } + deleted = false; + } catch (IOException ioe) { + LOG.info( + "Could not delete " + type + " under " + dir + ". might be transient; we'll retry. if it " + + "keeps " + "happening, use following exception when asking on mailing list.", + ioe); + deleted = false; + } catch (Exception e) { + LOG.info("unexpected exception: ", e); + deleted = false; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Finish deleting " + type + " under " + dir + ", deleted=" + deleted); + } + return deleted; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java index f201ae2d653..868463632cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hbase.master.cleaner; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -33,7 +35,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; public class DirScanPool implements ConfigurationObserver { private static final Log LOG = LogFactory.getLog(DirScanPool.class); private volatile int size; - private ForkJoinPool pool; + private final ThreadPoolExecutor pool; private int cleanerLatch; private boolean reconfigNotification; @@ -43,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver { // poolSize may be 0 or 0.0 from a careless configuration, // double check to make sure. size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; - pool = new ForkJoinPool(size); + pool = initializePool(size); LOG.info("Cleaner pool size is " + size); cleanerLatch = 0; } + private static ThreadPoolExecutor initializePool(int size) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1L, TimeUnit.MINUTES, + new LinkedBlockingQueue(), new DaemonThreadFactory("dir-scan-pool")); + executor.allowCoreThreadTimeOut(true); + return executor; + } + /** * Checks if pool can be updated. If so, mark for update later. * @param conf configuration @@ -74,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver { notifyAll(); } - synchronized void execute(ForkJoinTask task) { - pool.execute(task); + synchronized void execute(Runnable runnable) { + this.pool.execute(runnable); } public synchronized void shutdownNow() { @@ -100,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver { break; } } - shutdownNow(); - LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size); - pool = new ForkJoinPool(size); + LOG.info("Update chore's pool size from " + pool.getPoolSize() + " to " + size); + pool.setCorePoolSize(size); } public int getSize() {