From 43c0df51ea76995688857e5948cd9a3d9b58e2f9 Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Wed, 27 Jun 2018 15:07:19 +0800 Subject: [PATCH] HBASE-20732 Shutdown scan pool when master is stopped Signed-off-by: Chia-Ping Tsai --- .../apache/hadoop/hbase/master/HMaster.java | 1 + .../hbase/master/cleaner/CleanerChore.java | 50 ++++++++++++------- .../master/cleaner/TestCleanerChore.java | 14 +++--- 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9602d32bdcb..a5e07ba602c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1273,6 +1273,7 @@ public class HMaster extends HRegionServer implements MasterServices { } super.stopServiceThreads(); stopChores(); + CleanerChore.shutDownChorePool(); LOG.debug("Stopping service threads"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index cb202c82ff4..19a7a693ef8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -18,16 +18,19 @@ package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -118,7 +121,7 @@ public abstract class CleanerChore extends Schedu break; } } - pool.shutdownNow(); + shutDownNow(); LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); pool = new ForkJoinPool(size); } @@ -136,6 +139,13 @@ public abstract class CleanerChore extends Schedu synchronized void submit(ForkJoinTask task) { pool.submit(task); } + + synchronized void shutDownNow() { + if (pool == null || pool.isShutdown()) { + return; + } + pool.shutdownNow(); + } } // It may be waste resources for each cleaner chore own its pool, // so let's make pool for all cleaner chores. @@ -154,6 +164,13 @@ public abstract class CleanerChore extends Schedu } } + public static void shutDownChorePool() { + if (POOL != null) { + POOL.shutDownNow(); + POOL = null; + } + } + public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); @@ -472,35 +489,31 @@ public abstract class CleanerChore extends Schedu try { // if dir doesn't exist, we'll get null back for both of these // which will fall through to succeeding. - subDirs = getFilteredStatus(status -> status.isDirectory()); - files = getFilteredStatus(status -> status.isFile()); + subDirs = getFilteredStatus(FileStatus::isDirectory); + files = getFilteredStatus(FileStatus::isFile); } catch (IOException ioe) { LOG.warn("failed to get FileStatus for contents of '{}'", dir, ioe); return false; } - boolean nullSubDirs = subDirs == null; - if (nullSubDirs) { - LOG.trace("There is no subdir under {}", dir); - } - if (files == null) { - LOG.trace("There is no file under {}", dir); + boolean allFilesDeleted = true; + if (!files.isEmpty()) { + allFilesDeleted = deleteAction(() -> checkAndDeleteFiles(files), "files"); } - int capacity = nullSubDirs ? 0 : subDirs.size(); - List tasks = Lists.newArrayListWithCapacity(capacity); - if (!nullSubDirs) { + boolean allSubdirsDeleted = true; + if (!subDirs.isEmpty()) { + List tasks = Lists.newArrayListWithCapacity(subDirs.size()); sortByConsumedSpace(subDirs); for (FileStatus subdir : subDirs) { CleanerTask task = new CleanerTask(subdir, false); tasks.add(task); task.fork(); } + allSubdirsDeleted = deleteAction(() -> getCleanResult(tasks), "subdirs"); } - boolean result = true; - result &= deleteAction(() -> checkAndDeleteFiles(files), "files"); - result &= deleteAction(() -> getCleanResult(tasks), "subdirs"); + boolean result = allFilesDeleted && allSubdirsDeleted; // if and only if files and subdirs under current dir are deleted successfully, and // it is not the root dir, then task will try to delete it. if (result && !root) { @@ -511,14 +524,13 @@ public abstract class CleanerChore extends Schedu /** * Get FileStatus with filter. - * Pay attention that FSUtils #listStatusWithStatusFilter would return null, - * even though status is empty but not null. * @param function a filter function - * @return filtered FileStatus or null if dir doesn't exist + * @return filtered FileStatus or empty list if dir doesn't exist * @throws IOException if there's an error other than dir not existing */ private List getFilteredStatus(Predicate function) throws IOException { - return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status)); + return Optional.ofNullable(FSUtils.listStatusWithStatusFilter(fs, dir, + status -> function.test(status))).orElseGet(Collections::emptyList); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index c977f98e942..1ffd17ad4a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -37,8 +37,8 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -58,15 +58,16 @@ public class TestCleanerChore { private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - @Before - public void setup() throws Exception { + @BeforeClass + public static void setup() { CleanerChore.initChorePool(UTIL.getConfiguration()); } - @After - public void cleanup() throws Exception { + @AfterClass + public static void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests UTIL.cleanupTestDir(); + CleanerChore.shutDownChorePool(); } @Test @@ -301,6 +302,7 @@ public class TestCleanerChore { */ @Test public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception { + UTIL.cleanupTestDir(); Stoppable stop = new StoppableImplementation(); // need to use a localutil to not break the rest of the test that runs on the local FS, which // gets hosed when we start to use a minicluster.