HBASE-22867 The ForkJoinPool in CleanerChore will spawn thousands of threads in our cluster with thousands table (#513)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
d26f0062d1
commit
5be82d9fbf
|
@ -18,16 +18,17 @@
|
|||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.RecursiveTask;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -35,7 +36,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
@ -211,11 +211,16 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
cleanersChain.forEach(FileCleanerDelegate::preClean);
|
||||
}
|
||||
|
||||
public Boolean runCleaner() {
|
||||
public boolean runCleaner() {
|
||||
preRunCleaner();
|
||||
CleanerTask task = new CleanerTask(this.oldFileDir, true);
|
||||
pool.execute(task);
|
||||
return task.join();
|
||||
try {
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
pool.execute(() -> traverseAndDelete(oldFileDir, true, future));
|
||||
return future.get();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Failed to traverse and delete the dir: {}", oldFileDir, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -360,126 +365,97 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
}
|
||||
|
||||
private interface Action<T> {
|
||||
T act() throws IOException;
|
||||
T act() throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
|
||||
* everything was deleted. false on partial / total failures.
|
||||
* Attempts to clean up a directory(its subdirectories, and files) in a
|
||||
* {@link java.util.concurrent.ThreadPoolExecutor} concurrently. We can get the final result by
|
||||
* calling result.get().
|
||||
*/
|
||||
private final class CleanerTask extends RecursiveTask<Boolean> {
|
||||
private void traverseAndDelete(Path dir, boolean root, CompletableFuture<Boolean> result) {
|
||||
try {
|
||||
// Step.1: List all files under the given directory.
|
||||
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(dir));
|
||||
List<FileStatus> subDirs =
|
||||
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
|
||||
List<FileStatus> files =
|
||||
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
|
||||
|
||||
private static final long serialVersionUID = -5444212174088754172L;
|
||||
// Step.2: Try to delete all the deletable files.
|
||||
boolean allFilesDeleted =
|
||||
files.isEmpty() || deleteAction(() -> checkAndDeleteFiles(files), "files", dir);
|
||||
|
||||
private final Path dir;
|
||||
private final boolean root;
|
||||
|
||||
CleanerTask(final FileStatus dir, final boolean root) {
|
||||
this(dir.getPath(), root);
|
||||
}
|
||||
|
||||
CleanerTask(final Path dir, final boolean root) {
|
||||
this.dir = dir;
|
||||
this.root = root;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean compute() {
|
||||
LOG.trace("Cleaning under {}", dir);
|
||||
List<FileStatus> subDirs;
|
||||
List<FileStatus> files;
|
||||
try {
|
||||
// if dir doesn't exist, we'll get null back for both of these
|
||||
// which will fall through to succeeding.
|
||||
subDirs = getFilteredStatus(FileStatus::isDirectory);
|
||||
files = getFilteredStatus(FileStatus::isFile);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean allFilesDeleted = true;
|
||||
if (!files.isEmpty()) {
|
||||
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
|
||||
}
|
||||
|
||||
boolean allSubdirsDeleted = true;
|
||||
// Step.3: Start to traverse and delete the sub-directories.
|
||||
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
|
||||
if (!subDirs.isEmpty()) {
|
||||
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
|
||||
sortByConsumedSpace(subDirs);
|
||||
for (FileStatus subdir : subDirs) {
|
||||
CleanerTask task = new CleanerTask(subdir, false);
|
||||
tasks.add(task);
|
||||
task.fork();
|
||||
}
|
||||
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
|
||||
// Submit the request of sub-directory deletion.
|
||||
subDirs.forEach(subDir -> {
|
||||
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
|
||||
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
|
||||
futures.add(subFuture);
|
||||
});
|
||||
}
|
||||
|
||||
boolean result = allFilesDeleted && allSubdirsDeleted;
|
||||
// if and only if files and subdirs under current dir are deleted successfully, and
|
||||
// it is not the root dir, then task will try to delete it.
|
||||
if (result && !root) {
|
||||
result &= deleteAction(() -> fs.delete(dir, false), "dir");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get FileStatus with filter.
|
||||
* @param function a filter function
|
||||
* @return filtered FileStatus or empty list if dir doesn't exist
|
||||
* @throws IOException if there's an error other than dir not existing
|
||||
*/
|
||||
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
|
||||
return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
|
||||
status -> function.test(status))).orElseGet(Collections::emptyList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a delete on a specified type.
|
||||
* @param deletion a delete
|
||||
* @param type possible values are 'files', 'subdirs', 'dirs'
|
||||
* @return true if it deleted successfully, false otherwise
|
||||
*/
|
||||
private boolean deleteAction(Action<Boolean> deletion, String type) {
|
||||
boolean deleted;
|
||||
try {
|
||||
LOG.trace("Start deleting {} under {}", type, dir);
|
||||
deleted = deletion.act();
|
||||
} catch (PathIsNotEmptyDirectoryException exception) {
|
||||
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
|
||||
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
|
||||
// message below.
|
||||
LOG.debug("Couldn't delete '{}' yet because it isn't empty. Probably transient. " +
|
||||
"exception details at TRACE.", dir);
|
||||
LOG.trace("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
|
||||
deleted = false;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps " +
|
||||
"happening, use following exception when asking on mailing list.",
|
||||
type, dir, ioe);
|
||||
deleted = false;
|
||||
}
|
||||
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cleaner results of subdirs.
|
||||
* @param tasks subdirs cleaner tasks
|
||||
* @return true if all subdirs deleted successfully, false for patial/all failures
|
||||
* @throws IOException something happen during computation
|
||||
*/
|
||||
private boolean getCleanResult(List<CleanerTask> tasks) throws IOException {
|
||||
boolean cleaned = true;
|
||||
try {
|
||||
for (CleanerTask task : tasks) {
|
||||
cleaned &= task.get();
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return cleaned;
|
||||
// Step.4: Once all sub-files & sub-directories are deleted, then can try to delete the
|
||||
// current directory asynchronously.
|
||||
FutureUtils.addListener(
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
|
||||
(voidObj, e) -> {
|
||||
if (e != null) {
|
||||
result.completeExceptionally(e);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
boolean allSubDirsDeleted = futures.stream().allMatch(CompletableFuture::join);
|
||||
boolean deleted = allFilesDeleted && allSubDirsDeleted;
|
||||
if (deleted && !root) {
|
||||
// If and only if files and sub-dirs under current dir are deleted successfully, and
|
||||
// the empty directory can be deleted, and it is not the root dir then task will
|
||||
// try to delete it.
|
||||
deleted = deleteAction(() -> fs.delete(dir, false), "dir", dir);
|
||||
}
|
||||
result.complete(deleted);
|
||||
} catch (Exception ie) {
|
||||
// Must handle the inner exception here, otherwise the result may get stuck if one
|
||||
// sub-directory get some failure.
|
||||
result.completeExceptionally(ie);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
|
||||
result.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a delete on a specified type.
|
||||
* @param deletion a delete
|
||||
* @param type possible values are 'files', 'subdirs', 'dirs'
|
||||
* @return true if it deleted successfully, false otherwise
|
||||
*/
|
||||
private boolean deleteAction(Action<Boolean> deletion, String type, Path dir) {
|
||||
boolean deleted;
|
||||
try {
|
||||
LOG.trace("Start deleting {} under {}", type, dir);
|
||||
deleted = deletion.act();
|
||||
} catch (PathIsNotEmptyDirectoryException exception) {
|
||||
// N.B. HDFS throws this exception when we try to delete a non-empty directory, but
|
||||
// LocalFileSystem throws a bare IOException. So some test code will get the verbose
|
||||
// message below.
|
||||
LOG.debug("Couldn't delete '{}' yet because it isn't empty w/exception.", dir, exception);
|
||||
deleted = false;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Could not delete {} under {}. might be transient; we'll retry. if it keeps "
|
||||
+ "happening, use following exception when asking on mailing list.",
|
||||
type, dir, ioe);
|
||||
deleted = false;
|
||||
} catch (Exception e) {
|
||||
LOG.info("unexpected exception: ", e);
|
||||
deleted = false;
|
||||
}
|
||||
LOG.trace("Finish deleting {} under {}, deleted=", type, dir, deleted);
|
||||
return deleted;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -32,7 +35,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class DirScanPool implements ConfigurationObserver {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DirScanPool.class);
|
||||
private volatile int size;
|
||||
private ForkJoinPool pool;
|
||||
private final ThreadPoolExecutor pool;
|
||||
private int cleanerLatch;
|
||||
private boolean reconfigNotification;
|
||||
|
||||
|
@ -42,11 +45,18 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
// poolSize may be 0 or 0.0 from a careless configuration,
|
||||
// double check to make sure.
|
||||
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
|
||||
pool = new ForkJoinPool(size);
|
||||
pool = initializePool(size);
|
||||
LOG.info("Cleaner pool size is {}", size);
|
||||
cleanerLatch = 0;
|
||||
}
|
||||
|
||||
private static ThreadPoolExecutor initializePool(int size) {
|
||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
|
||||
new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool"));
|
||||
executor.allowCoreThreadTimeOut(true);
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if pool can be updated. If so, mark for update later.
|
||||
* @param conf configuration
|
||||
|
@ -73,8 +83,8 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
notifyAll();
|
||||
}
|
||||
|
||||
synchronized void execute(ForkJoinTask<?> task) {
|
||||
pool.execute(task);
|
||||
synchronized void execute(Runnable runnable) {
|
||||
pool.execute(runnable);
|
||||
}
|
||||
|
||||
public synchronized void shutdownNow() {
|
||||
|
@ -99,9 +109,8 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
break;
|
||||
}
|
||||
}
|
||||
shutdownNow();
|
||||
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
|
||||
pool = new ForkJoinPool(size);
|
||||
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
|
||||
pool.setCorePoolSize(size);
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
|
|
Loading…
Reference in New Issue