HBASE-22912 [Backport] HBASE-22867 to branch-1 to avoid ForkJoinPool to spawn thousands of threads (#549)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
openinx 2019-09-05 19:32:42 +08:00 committed by GitHub
parent 4c2bf71a3a
commit 352344c9be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 185 additions and 138 deletions

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -36,8 +36,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FileStatusFilter;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
@ -207,10 +205,20 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
}
public Boolean runCleaner() {
CleanerTask task = new CleanerTask(this.oldFileDir, true);
pool.execute(task);
return task.join();
public boolean runCleaner() {
try {
final AsyncResult<Boolean> result = new AsyncResult<Boolean>();
pool.execute(new Runnable() {
@Override
public void run() {
traverseAndDelete(oldFileDir, true, result);
}
});
return result.get();
} catch (Exception e) {
LOG.info("Failed to traverse and delete paths under the dir: " + oldFileDir, e);
return false;
}
}
/**
@ -322,140 +330,171 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
T act() throws IOException;
}
private interface Callback<T> {
void run(T val);
}
private final class AsyncResult<T> {
private Callback<T> callback;
private T result;
private boolean resultSet = false;
AsyncResult(Callback<T> callback) {
this.callback = callback;
}
AsyncResult() {
}
void set(T result) {
synchronized (this) {
this.result = result;
if (callback != null) {
callback.run(result);
}
// Mark the result set process finished and notify the waiting get method.
this.resultSet = true;
this.notifyAll();
}
}
synchronized T get() throws Exception {
while (!resultSet) {
wait();
}
return result;
}
}
/**
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
* everything was deleted. false on partial / total failures.
* Attempts to clean up a directory(its subdirectories, and files) in a
* {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
* calling result.get().
* @param dir means the directory we will start to traverse and delete.
* @param root means whether it's the root directory to traverse, if true then cannot delete it.
* @param result {@link AsyncResult<Boolean>} to fetch the result. True means the current
* directory has been deleted successfully (for root dir we don't need that) and the
* parent will try to delete its own directory if all of the children(files and
* sub-directories are included) has been deleted successfully.
*/
private final class CleanerTask extends RecursiveTask<Boolean> {
private static final long serialVersionUID = -1584635903138015418L;
private final Path dir;
private final boolean root;
CleanerTask(final FileStatus dir, final boolean root) {
this(dir.getPath(), root);
}
CleanerTask(final Path dir, final boolean root) {
this.dir = dir;
this.root = root;
}
@Override
protected Boolean compute() {
LOG.trace("Cleaning under " + dir);
List<FileStatus> subDirs;
List<FileStatus> tmpFiles;
final List<FileStatus> files;
private void traverseAndDelete(final Path dir, final boolean root,
final AsyncResult<Boolean> result) {
try {
// if dir doesn't exist, we'll get null back for both of these
// which will fall through to succeeding.
subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
@Override
public boolean accept(FileStatus f) {
return f.isDirectory();
}
});
if (subDirs == null) {
subDirs = Collections.emptyList();
}
tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
@Override
public boolean accept(FileStatus f) {
return f.isFile();
}
});
files = tmpFiles == null ? Collections.<FileStatus>emptyList() : tmpFiles;
} catch (IOException ioe) {
LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe);
return false;
}
boolean allFilesDeleted = true;
if (!files.isEmpty()) {
allFilesDeleted = deleteAction(new Action<Boolean>() {
@Override
public Boolean act() throws IOException {
return checkAndDeleteFiles(files);
}
}, "files");
}
boolean allSubdirsDeleted = true;
if (!subDirs.isEmpty()) {
final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
for (FileStatus subdir : subDirs) {
CleanerTask task = new CleanerTask(subdir, false);
tasks.add(task);
task.fork();
}
allSubdirsDeleted = deleteAction(new Action<Boolean>() {
@Override
public Boolean act() throws IOException {
return getCleanResult(tasks);
}
}, "subdirs");
}
boolean result = allFilesDeleted && allSubdirsDeleted;
// if and only if files and subdirs under current dir are deleted successfully, and
// it is not the root dir, then task will try to delete it.
if (result && !root) {
result &= deleteAction(new Action<Boolean>() {
final Action<Boolean> curDirDeletion = new Action<Boolean>() {
@Override
public Boolean act() throws IOException {
return fs.delete(dir, false);
}
}, "dir");
};
// Step.1: List all files under the given directory.
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
final List<FileStatus> subDirs = new ArrayList<>();
final List<FileStatus> files = new ArrayList<>();
for (FileStatus status : allPaths) {
if (status.isDirectory()) {
subDirs.add(status);
} else if (status.isFile()) {
files.add(status);
}
}
// Step.2: Try to delete all the deletable files.
final boolean allFilesDeleted = files.isEmpty() || deleteAction(new Action<Boolean>() {
@Override
public Boolean act() throws IOException {
return checkAndDeleteFiles(files);
}
}, "files", dir);
// Step.3: Start to traverse and delete the sub-directories.
if (subDirs.isEmpty()) {
// If no sub-directories, then just try to delete the current dir and finish the result.
boolean deleted = allFilesDeleted;
if (allFilesDeleted && !root) {
deleted = deleteAction(curDirDeletion, "dir", dir);
}
result.set(deleted);
return;
}
// Otherwise, there should be some sub-directories. then we will register the following
// callback in AsyncResult of sub-directory, and once all of the sub-directories are traversed
// and deleted then the callback will try to delete the current dir and finish the result.
final AtomicInteger remain = new AtomicInteger(subDirs.size());
Callback<Boolean> callback = new Callback<Boolean>() {
private volatile boolean allSubDirDeleted = true;
@Override
public void run(Boolean subDirDeleted) {
allSubDirDeleted &= subDirDeleted;
if (remain.decrementAndGet() == 0) {
boolean deleted = allFilesDeleted && allSubDirDeleted;
if (deleted && !root) {
deleted = deleteAction(curDirDeletion, "dir", dir);
}
result.set(deleted);
}
}
};
// Submit the request of sub-directory deletion.
for (FileStatus subDir : subDirs) {
final FileStatus finalSubDir = subDir;
// Register the callback in AsyncResult here.
final AsyncResult<Boolean> asyncResult = new AsyncResult<Boolean>(callback);
pool.execute(new Runnable() {
@Override
public void run() {
traverseAndDelete(finalSubDir.getPath(), false, asyncResult);
}
});
}
} catch (Exception e) {
result.set(false);
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to traverse and delete the path=" + dir + ", root=" + root, e);
}
}
return result;
}
/**
* Perform a delete on a specified type.
* @param deletion a delete
* @param type possible values are 'files', 'subdirs', 'dirs'
* @param dir delete actions happened under the given directory.
* @return true if it deleted successfully, false otherwise
*/
private boolean deleteAction(Action<Boolean> deletion, String type) {
private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
boolean deleted;
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Start deleting " + type + " under " + dir);
}
deleted = deletion.act();
} catch (PathIsNotEmptyDirectoryException exception) {
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
// message below.
LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty. Probably transient. " +
"exception details at TRACE.");
LOG.trace("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.",
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't delete '" + dir + "' yet because it isn't empty w/exception.",
exception);
}
deleted = false;
} catch (IOException ioe) {
LOG.info("Could not delete " + type + " under " + dir + ". might be transient; we'll " +
"retry. if it keeps happening, use following exception when asking on mailing list.",
LOG.info(
"Could not delete " + type + " under " + dir + ". might be transient; we'll retry. if it "
+ "keeps " + "happening, use following exception when asking on mailing list.",
ioe);
deleted = false;
} catch (Exception e) {
LOG.info("unexpected exception: ", e);
deleted = false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Finish deleting " + type + " under " + dir + ", deleted=" + deleted);
}
LOG.trace("Finish deleting " + type + " under " + dir + " deleted=" + deleted);
return deleted;
}
/**
* Get cleaner results of subdirs.
* @param tasks subdirs cleaner tasks
* @return true if all subdirs deleted successfully, false for patial/all failures
* @throws IOException something happen during computation
*/
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
boolean cleaned = true;
try {
for (CleanerTask task : tasks) {
cleaned &= task.get();
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
return cleaned;
}
}
}

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
@ -33,7 +35,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
public class DirScanPool implements ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(DirScanPool.class);
private volatile int size;
private ForkJoinPool pool;
private final ThreadPoolExecutor pool;
private int cleanerLatch;
private boolean reconfigNotification;
@ -43,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver {
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
pool = new ForkJoinPool(size);
pool = initializePool(size);
LOG.info("Cleaner pool size is " + size);
cleanerLatch = 0;
}
private static ThreadPoolExecutor initializePool(int size) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1L, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("dir-scan-pool"));
executor.allowCoreThreadTimeOut(true);
return executor;
}
/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
@ -74,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver {
notifyAll();
}
synchronized void execute(ForkJoinTask<?> task) {
pool.execute(task);
synchronized void execute(Runnable runnable) {
this.pool.execute(runnable);
}
public synchronized void shutdownNow() {
@ -100,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver {
break;
}
}
shutdownNow();
LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size);
pool = new ForkJoinPool(size);
LOG.info("Update chore's pool size from " + pool.getPoolSize() + " to " + size);
pool.setCorePoolSize(size);
}
public int getSize() {