HBASE-20732 Shutdown scan pool when master is stopped

This commit is contained in:
Reid Chan 2018-06-27 18:55:21 +08:00
parent 03946a389a
commit c81d0aff95
3 changed files with 50 additions and 32 deletions

View File

@ -1286,6 +1286,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
} }
super.stopServiceThreads(); super.stopServiceThreads();
stopChores(); stopChores();
CleanerChore.shutDownChorePool();
// Wait for all the remaining region servers to report in IFF we were // Wait for all the remaining region servers to report in IFF we were
// running a cluster shutdown AND we were NOT aborting. // running a cluster shutdown AND we were NOT aborting.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.cleaner; package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -118,7 +119,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
break; break;
} }
} }
pool.shutdownNow(); shutDownNow();
LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size); LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size);
pool = new ForkJoinPool(size); pool = new ForkJoinPool(size);
} }
@ -136,6 +137,13 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
synchronized void submit(ForkJoinTask task) { synchronized void submit(ForkJoinTask task) {
pool.submit(task); pool.submit(task);
} }
synchronized void shutDownNow() {
if (pool == null || pool.isShutdown()) {
return;
}
pool.shutdownNow();
}
} }
// It may be waste resources for each cleaner chore own its pool, // It may be waste resources for each cleaner chore own its pool,
// so let's make pool for all cleaner chores. // so let's make pool for all cleaner chores.
@ -154,12 +162,18 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
} }
} }
public static void shutDownChorePool() {
if (POOL != null) {
POOL.shutDownNow();
POOL = null;
}
}
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) { FileSystem fs, Path oldFileDir, String confKey) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
} }
/** /**
* @param name name of the chore being run * @param name name of the chore being run
* @param sleepPeriod the period of time to sleep between each run * @param sleepPeriod the period of time to sleep between each run
@ -432,6 +446,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
protected Boolean compute() { protected Boolean compute() {
LOG.trace("Cleaning under " + dir); LOG.trace("Cleaning under " + dir);
List<FileStatus> subDirs; List<FileStatus> subDirs;
List<FileStatus> tmpFiles;
final List<FileStatus> files; final List<FileStatus> files;
try { try {
// if dir doesn't exist, we'll get null back for both of these // if dir doesn't exist, we'll get null back for both of these
@ -442,48 +457,48 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
return f.isDirectory(); return f.isDirectory();
} }
}); });
files = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() { if (subDirs == null) {
subDirs = Collections.emptyList();
}
tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {
@Override @Override
public boolean accept(FileStatus f) { public boolean accept(FileStatus f) {
return f.isFile(); return f.isFile();
} }
}); });
files = tmpFiles == null ? Collections.<FileStatus>emptyList() : tmpFiles;
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe); LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe);
return false; return false;
} }
boolean nullSubDirs = subDirs == null; boolean allFilesDeleted = true;
if (nullSubDirs) { if (!files.isEmpty()) {
LOG.trace("There is no subdir under " + dir); allFilesDeleted = deleteAction(new Action<Boolean>() {
}
if (files == null) {
LOG.trace("There is no file under " + dir);
}
int capacity = nullSubDirs ? 0 : subDirs.size();
final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity);
if (!nullSubDirs) {
for (FileStatus subdir : subDirs) {
CleanerTask task = new CleanerTask(subdir, false);
tasks.add(task);
task.fork();
}
}
boolean result = true;
result &= deleteAction(new Action<Boolean>() {
@Override @Override
public Boolean act() throws IOException { public Boolean act() throws IOException {
return checkAndDeleteFiles(files); return checkAndDeleteFiles(files);
} }
}, "files"); }, "files");
result &= deleteAction(new Action<Boolean>() { }
boolean allSubdirsDeleted = true;
if (!subDirs.isEmpty()) {
final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());
for (FileStatus subdir : subDirs) {
CleanerTask task = new CleanerTask(subdir, false);
tasks.add(task);
task.fork();
}
allSubdirsDeleted = deleteAction(new Action<Boolean>() {
@Override @Override
public Boolean act() throws IOException { public Boolean act() throws IOException {
return getCleanResult(tasks); return getCleanResult(tasks);
} }
}, "subdirs"); }, "subdirs");
}
boolean result = allFilesDeleted && allSubdirsDeleted;
// if and only if files and subdirs under current dir are deleted successfully, and // if and only if files and subdirs under current dir are deleted successfully, and
// it is not the root dir, then task will try to delete it. // it is not the root dir, then task will try to delete it.
if (result && !root) { if (result && !root) {

View File

@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.StoppableImplementation; import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.junit.After; import org.junit.AfterClass;
import org.junit.Before; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -52,16 +52,17 @@ public class TestCleanerChore {
private static final Log LOG = LogFactory.getLog(TestCleanerChore.class); private static final Log LOG = LogFactory.getLog(TestCleanerChore.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@Before @BeforeClass
public void setup() throws Exception { public static void setup() {
CleanerChore.initChorePool(UTIL.getConfiguration()); CleanerChore.initChorePool(UTIL.getConfiguration());
} }
@After @AfterClass
public void cleanup() throws Exception { public static void cleanup() throws Exception {
// delete and recreate the test directory, ensuring a clean test dir between tests // delete and recreate the test directory, ensuring a clean test dir between tests
UTIL.cleanupTestDir(); UTIL.cleanupTestDir();
} CleanerChore.shutDownChorePool();
}
@Test @Test
@ -296,6 +297,7 @@ public class TestCleanerChore {
*/ */
@Test @Test
public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception { public void testNoExceptionFromDirectoryWithRacyChildren() throws Exception {
UTIL.cleanupTestDir();
Stoppable stop = new StoppableImplementation(); Stoppable stop = new StoppableImplementation();
// need to use a localutil to not break the rest of the test that runs on the local FS, which // need to use a localutil to not break the rest of the test that runs on the local FS, which
// gets hosed when we start to use a minicluster. // gets hosed when we start to use a minicluster.