HBASE-20732 Shutdown scan pool when master is stopped

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Reid Chan 2018-06-27 15:07:19 +08:00
parent 32ee0eaf4b
commit 74e5c776b3
3 changed files with 40 additions and 25 deletions

View File

@ -1281,6 +1281,7 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
super.stopServiceThreads(); super.stopServiceThreads();
stopChores(); stopChores();
CleanerChore.shutDownChorePool();
LOG.debug("Stopping service threads"); LOG.debug("Stopping service threads");

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.hbase.master.cleaner; package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask; import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -118,7 +121,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
break; break;
} }
} }
pool.shutdownNow(); shutDownNow();
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
pool = new ForkJoinPool(size); pool = new ForkJoinPool(size);
} }
@ -136,6 +139,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
synchronized void submit(ForkJoinTask task) { synchronized void submit(ForkJoinTask task) {
pool.submit(task); pool.submit(task);
} }
synchronized void shutDownNow() {
if (pool == null || pool.isShutdown()) {
return;
}
pool.shutdownNow();
}
} }
// It may be waste resources for each cleaner chore own its pool, // It may be waste resources for each cleaner chore own its pool,
// so let's make pool for all cleaner chores. // so let's make pool for all cleaner chores.
@ -154,6 +164,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
} }
} }
public static void shutDownChorePool() {
if (POOL != null) {
POOL.shutDownNow();
POOL = null;
}
}
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) { FileSystem fs, Path oldFileDir, String confKey) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
@ -472,35 +489,31 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
try { try {
// if dir doesn't exist, we'll get null back for both of these // if dir doesn't exist, we'll get null back for both of these
// which will fall through to succeeding. // which will fall through to succeeding.
subDirs = getFilteredStatus(status -> status.isDirectory()); subDirs = getFilteredStatus(FileStatus::isDirectory);
files = getFilteredStatus(status -> status.isFile()); files = getFilteredStatus(FileStatus::isFile);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe); LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe);
return false; return false;
} }
boolean nullSubDirs = subDirs == null; boolean allFilesDeleted = true;
if (nullSubDirs) { if (!files.isEmpty()) {
LOG.trace("There is no subdir under {}", dir); allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files");
}
if (files == null) {
LOG.trace("There is no file under {}", dir);
} }
int capacity = nullSubDirs ? 0 : subDirs.size(); boolean allSubdirsDeleted = true;
List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity); if (!subDirs.isEmpty()) {
if (!nullSubDirs) { List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
sortByConsumedSpace(subDirs); sortByConsumedSpace(subDirs);
for (FileStatus subdir : subDirs) { for (FileStatus subdir : subDirs) {
CleanerTask task = new CleanerTask(subdir, false); CleanerTask task = new CleanerTask(subdir, false);
tasks.add(task); tasks.add(task);
task.fork(); task.fork();
} }
allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs");
} }
boolean result = true; boolean result = allFilesDeleted && allSubdirsDeleted;
result &= deleteAction(() -> checkAndDeleteFiles(files), "files");
result &= deleteAction(() -> getCleanResult(tasks), "subdirs");
// if and only if files and subdirs under current dir are deleted successfully, and // if and only if files and subdirs under current dir are deleted successfully, and
// it is not the root dir, then task will try to delete it. // it is not the root dir, then task will try to delete it.
if (result && !root) { if (result && !root) {
@ -511,14 +524,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
/** /**
* Get FileStatus with filter. * Get FileStatus with filter.
* Pay attention that FSUtils #listStatusWithStatusFilter would return null,
* even though status is empty but not null.
* @param function a filter function * @param function a filter function
* @return filtered FileStatus or null if dir doesn't exist * @return filtered FileStatus or empty list if dir doesn't exist
* @throws IOException if there's an error other than dir not existing * @throws IOException if there's an error other than dir not existing
*/ */
private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException { private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function) throws IOException {
return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status)); return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir,
status -> function.test(status))).orElseGet(Collections::emptyList);
} }
/** /**

View File

@ -37,8 +37,8 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.StoppableImplementation; import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.junit.After; import org.junit.AfterClass;
import org.junit.Before; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -58,15 +58,16 @@ public class TestCleanerChore {
private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@Before @BeforeClass
public void setup() throws Exception { public static void setup() {
CleanerChore.initChorePool(UTIL.getConfiguration()); CleanerChore.initChorePool(UTIL.getConfiguration());
} }
@After @AfterClass
public void cleanup() throws Exception { public static void cleanup() throws Exception {
// delete and recreate the test directory, ensuring a clean test dir between tests // delete and recreate the test directory, ensuring a clean test dir between tests
UTIL.cleanupTestDir(); UTIL.cleanupTestDir();
CleanerChore.shutDownChorePool();
} }
@Test @Test
@ -301,6 +302,7 @@ public class TestCleanerChore {
*/ */
@Test @Test
public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception { public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception {
UTIL.cleanupTestDir();
Stoppable stop = new StoppableImplementation(); Stoppable stop = new StoppableImplementation();
// need to use a localutil to not break the rest of the test that runs on the local FS, which // need to use a localutil to not break the rest of the test that runs on the local FS, which
// gets hosed when we start to use a minicluster. // gets hosed when we start to use a minicluster.