HBASE-20732 Shutdown scan pool when master is stopped
Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
7fad1e52a3
commit
43c0df51ea
|
@ -1273,6 +1273,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
super.stopServiceThreads();
|
||||
stopChores();
|
||||
CleanerChore.shutDownChorePool();
|
||||
|
||||
LOG.debug("Stopping service threads");
|
||||
|
||||
|
|
|
@ -18,16 +18,19 @@
|
|||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.RecursiveTask;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -118,7 +121,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
break;
|
||||
}
|
||||
}
|
||||
pool.shutdownNow();
|
||||
shutDownNow();
|
||||
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
|
||||
pool = new ForkJoinPool(size);
|
||||
}
|
||||
|
@ -136,6 +139,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
synchronized void submit(ForkJoinTask task) {
|
||||
pool.submit(task);
|
||||
}
|
||||
|
||||
synchronized void shutDownNow() {
|
||||
if (pool == null || pool.isShutdown()) {
|
||||
return;
|
||||
}
|
||||
pool.shutdownNow();
|
||||
}
|
||||
}
|
||||
// It may be waste resources for each cleaner chore own its pool,
|
||||
// so let's make pool for all cleaner chores.
|
||||
|
@ -154,6 +164,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
}
|
||||
}
|
||||
|
||||
public static void shutDownChorePool() {
|
||||
if (POOL != null) {
|
||||
POOL.shutDownNow();
|
||||
POOL = null;
|
||||
}
|
||||
}
|
||||
|
||||
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
||||
FileSystem fs, Path oldFileDir, String confKey) {
|
||||
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
|
||||
|
@ -472,35 +489,31 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
try {
|
||||
// if dir doesn't exist, we'll get null back for both of these
|
||||
// which will fall through to succeeding.
|
||||
subDirs = getFilteredStatus(status -> status.isDirectory());
|
||||
files = getFilteredStatus(status -> status.isFile());
|
||||
subDirs = getFilteredStatus(FileStatus::isDirectory);
|
||||
files = getFilteredStatus(FileStatus::isFile);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean nullSubDirs = subDirs == null;
|
||||
if (nullSubDirs) {
|
||||
LOG.trace("There is no subdir under {}", dir);
|
||||
}
|
||||
if (files == null) {
|
||||
LOG.trace("There is no file under {}", dir);
|
||||
boolean allFilesDeleted = true;
|
||||
if (!files.isEmpty()) {
|
||||
allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
|
||||
}
|
||||
|
||||
int capacity = nullSubDirs ? 0 : subDirs.size();
|
||||
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity);
|
||||
if (!nullSubDirs) {
|
||||
boolean allSubdirsDeleted = true;
|
||||
if (!subDirs.isEmpty()) {
|
||||
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
|
||||
sortByConsumedSpace(subDirs);
|
||||
for (FileStatus subdir : subDirs) {
|
||||
CleanerTask task = new CleanerTask(subdir, false);
|
||||
tasks.add(task);
|
||||
task.fork();
|
||||
}
|
||||
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
|
||||
}
|
||||
|
||||
boolean result = true;
|
||||
result &= deleteAction(() -> checkAndDeleteFiles(files), "files");
|
||||
result &= deleteAction(() -> getCleanResult(tasks), "subdirs");
|
||||
boolean result = allFilesDeleted && allSubdirsDeleted;
|
||||
// if and only if files and subdirs under current dir are deleted successfully, and
|
||||
// it is not the root dir, then task will try to delete it.
|
||||
if (result && !root) {
|
||||
|
@ -511,14 +524,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
|
||||
/**
|
||||
* Get FileStatus with filter.
|
||||
* Pay attention that FSUtils #listStatusWithStatusFilter would return null,
|
||||
* even though status is empty but not null.
|
||||
* @param function a filter function
|
||||
* @return filtered FileStatus or null if dir doesn't exist
|
||||
* @return filtered FileStatus or empty list if dir doesn't exist
|
||||
* @throws IOException if there's an error other than dir not existing
|
||||
*/
|
||||
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
|
||||
return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status));
|
||||
return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
|
||||
status -> function.test(status))).orElseGet(Collections::emptyList);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -37,8 +37,8 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
|
|||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -58,15 +58,16 @@ public class TestCleanerChore {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
CleanerChore.initChorePool(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
@AfterClass
|
||||
public static void cleanup() throws Exception {
|
||||
// delete and recreate the test directory, ensuring a clean test dir between tests
|
||||
UTIL.cleanupTestDir();
|
||||
CleanerChore.shutDownChorePool();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -301,6 +302,7 @@ public class TestCleanerChore {
|
|||
*/
|
||||
@Test
|
||||
public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception {
|
||||
UTIL.cleanupTestDir();
|
||||
Stoppable stop = new StoppableImplementation();
|
||||
// need to use a localutil to not break the rest of the test that runs on the local FS, which
|
||||
// gets hosed when we start to use a minicluster.
|
||||
|
|
Loading…
Reference in New Issue