HBASE-21867 Support multi-threads in HFileArchiver

This commit is contained in:
Toshihiro Suzuki 2019-02-09 22:40:55 +09:00
parent a551149ca7
commit 72df52283b
6 changed files with 342 additions and 35 deletions

View File

@ -19,11 +19,18 @@ package org.apache.hadoop.hbase.backup;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -37,6 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -67,6 +75,8 @@ public class HFileArchiver {
private static ThreadPoolExecutor archiveExecutor;
private HFileArchiver() {
// hidden ctor since this is just a util
@ -103,14 +113,12 @@ public class HFileArchiver {
* the archive path)
* @param tableDir {@link Path} to where the table is being stored (for building the archive path)
* @param regionDir {@link Path} to where a region is being stored (for building the archive path)
* @return <tt>true</tt> if the region was sucessfully deleted. <tt>false</tt> if the filesystem
* @return <tt>true</tt> if the region was successfully deleted. <tt>false</tt> if the filesystem
* operations could not complete.
* @throws IOException if the request cannot be completed
public static boolean archiveRegion(FileSystem fs, Path rootdir, Path tableDir, Path regionDir)
throws IOException {
LOG.debug("ARCHIVING {}", rootdir.toString());
// otherwise, we archive the files
// make sure we can archive
if (tableDir == null || regionDir == null) {
@ -122,6 +130,8 @@ public class HFileArchiver {
return false;
LOG.debug("ARCHIVING {}", regionDir);
// make sure the regiondir lives under the tabledir
Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir,
@ -137,7 +147,7 @@ public class HFileArchiver {
PathFilter nonHidden = new PathFilter() {
public boolean accept(Path file) {
return dirFilter.accept(file) && !file.getName().toString().startsWith(".");
return dirFilter.accept(file) && !file.getName().startsWith(".");
FileStatus[] storeDirs = FSUtils.listStatus(fs, regionDir, nonHidden);
@ -162,6 +172,67 @@ public class HFileArchiver {
return deleteRegionWithoutArchiving(fs, regionDir);
* Archive the specified regions in parallel.
* @param conf the configuration to use
* @param fs {@link FileSystem} from which to remove the region
* @param rootDir {@link Path} to the root directory where hbase files are stored (for building
* the archive path)
* @param tableDir {@link Path} to where the table is being stored (for building the archive
* path)
* @param regionDirList {@link Path} to where regions are being stored (for building the archive
* path)
* @throws IOException if the request cannot be completed
public static void archiveRegions(Configuration conf, FileSystem fs, Path rootDir, Path tableDir,
List<Path> regionDirList) throws IOException {
List<Future<Void>> futures = new ArrayList<>(regionDirList.size());
for (Path regionDir: regionDirList) {
Future<Void> future = getArchiveExecutor(conf).submit(() -> {
archiveRegion(fs, rootDir, tableDir, regionDir);
return null;
try {
for (Future<Void> future: futures) {
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
throw new IOException(e.getCause());
private static synchronized ThreadPoolExecutor getArchiveExecutor(final Configuration conf) {
if (archiveExecutor == null) {
int maxThreads = conf.getInt("hbase.hfilearchiver.thread.pool.max", 8);
archiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
// Shutdown this ThreadPool in a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> archiveExecutor.shutdown()));
return archiveExecutor;
// We need this method instead of Threads.getNamedThreadFactory() to pass some tests.
// The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for
// new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related
// issues in some tests.
private static ThreadFactory getThreadFactory() {
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
return new Thread(r, name);
* Remove from the specified region the store files of the specified column family,
* either by archiving them or outright deletion

View File

@ -50,6 +50,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
* This class abstracts a bunch of operations the HMaster needs to interact with
* the underlying file system like creating the initial layout, checking file
@ -306,16 +308,16 @@ public class MasterFileSystem {
* Make sure the hbase temp directory exists and is empty.
* NOTE that this method is only executed once just after the master becomes the active one.
private void checkTempDir(final Path tmpdir, final Configuration c, final FileSystem fs)
void checkTempDir(final Path tmpdir, final Configuration c, final FileSystem fs)
throws IOException {
// If the temp directory exists, clear the content (left over, from the previous run)
if (fs.exists(tmpdir)) {
// Archive table in temp, maybe left over from failed deletion,
// if not the cleaner will take care of them.
for (Path tabledir: FSUtils.getTableDirs(fs, tmpdir)) {
for (Path regiondir: FSUtils.getRegionDirs(fs, tabledir)) {
HFileArchiver.archiveRegion(fs, this.rootdir, tabledir, regiondir);
for (Path tableDir: FSUtils.getTableDirs(fs, tmpdir)) {
HFileArchiver.archiveRegions(c, fs, this.rootdir, tableDir,
FSUtils.getRegionDirs(fs, tableDir));
if (!fs.delete(tmpdir, true)) {
throw new IOException("Unable to clean the temp directory: " + tmpdir);

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@ -43,7 +46,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -293,35 +295,37 @@ public class DeleteTableProcedure
throw new IOException("HBase temp directory '" + tempdir + "' creation failure.");
if (fs.exists(tempTableDir)) {
// what's in this dir? something old? probably something manual from the user...
// let's get rid of this stuff...
FileStatus[] files = fs.listStatus(tempTableDir);
if (files != null && files.length > 0) {
List<Path> regionDirList = Arrays.stream(files)
HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, mfs.getRootDir(),
tempTableDir, regionDirList);
fs.delete(tempTableDir, true);
// Move the table in /hbase/.tmp
if (!fs.rename(tableDir, tempTableDir)) {
if (fs.exists(tempTableDir)) {
// what's in this dir? something old? probably something manual from the user...
// let's get rid of this stuff...
FileStatus[] files = fs.listStatus(tempdir);
if (files != null && files.length > 0) {
for (int i = 0; i < files.length; ++i) {
if (!files[i].isDirectory()) {
HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, files[i].getPath());
fs.delete(tempdir, true);
throw new IOException("Unable to move '" + tableDir + "' to temp '" + tempTableDir + "'");
// Archive regions from FS (temp directory)
if (archive) {
for (RegionInfo hri : regions) {
LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS");
HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName()));
LOG.debug("Table '" + tableName + "' archived!");
List<Path> regionDirList = regions.stream()
.map(region -> FSUtils.getRegionDir(tempTableDir, region))
HFileArchiver.archiveRegions(env.getMasterConfiguration(), fs, mfs.getRootDir(),
tempTableDir, regionDirList);
LOG.debug("Table '{}' archived!", tableName);
// Archive mob data

View File

@ -23,12 +23,15 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -41,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
@ -53,6 +57,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -128,7 +133,7 @@ public class TestHFileArchiving {
public void testRemovesRegionDirOnArchive() throws Exception {
public void testRemoveRegionDirOnArchive() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.createTable(tableName, TEST_FAM);
@ -158,7 +163,6 @@ public class TestHFileArchiving {
Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(), region);
// check to make sure the store directory was copied
// check to make sure the store directory was copied
FileStatus[] stores = fs.listStatus(archiveDir, new PathFilter() {
@ -235,6 +239,100 @@ public class TestHFileArchiving {
private List<HRegion> initTableForArchivingRegions(TableName tableName) throws IOException {
final byte[][] splitKeys = new byte[][] {
Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d")
UTIL.createTable(tableName, TEST_FAM, splitKeys);
// get the current store files for the regions
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(tableName);
// make sure we have 4 regions serving this table
assertEquals(4, regions.size());
// and load the table
try (Table table = UTIL.getConnection().getTable(tableName)) {
UTIL.loadTable(table, TEST_FAM);
// disable the table so that we can manipulate the files
return regions;
public void testArchiveRegions() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
List<HRegion> regions = initTableForArchivingRegions(tableName);
FileSystem fs = UTIL.getTestFileSystem();
// now attempt to depose the regions
Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
Path tableDir = FSUtils.getTableDir(rootDir, regions.get(0).getRegionInfo().getTable());
List<Path> regionDirList = regions.stream()
.map(region -> FSUtils.getRegionDir(tableDir, region.getRegionInfo()))
HFileArchiver.archiveRegions(UTIL.getConfiguration(), fs, rootDir, tableDir, regionDirList);
// check for the existence of the archive directory and some files in it
for (HRegion region : regions) {
Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(),
// check to make sure the store directory was copied
FileStatus[] stores = fs.listStatus(archiveDir,
p -> !p.getName().contains(HConstants.RECOVERED_EDITS_DIR));
assertTrue(stores.length == 1);
// make sure we archived the store files
FileStatus[] storeFiles = fs.listStatus(stores[0].getPath());
assertTrue(storeFiles.length > 0);
// then ensure the region's directories aren't present
for (Path regionDir: regionDirList) {
public void testArchiveRegionsWhenPermissionDenied() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
List<HRegion> regions = initTableForArchivingRegions(tableName);
// now attempt to depose the regions
Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
Path tableDir = FSUtils.getTableDir(rootDir, regions.get(0).getRegionInfo().getTable());
List<Path> regionDirList = regions.stream()
.map(region -> FSUtils.getRegionDir(tableDir, region.getRegionInfo()))
// To create a permission denied error, we do archive regions as a non-current user
ugi = UserGroupInformation.createUserForTesting("foo1234", new String[]{"group1"});
try {
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
FileSystem fs = UTIL.getTestFileSystem();
HFileArchiver.archiveRegions(UTIL.getConfiguration(), fs, rootDir, tableDir,
return null;
} catch (IOException e) {
assertTrue(e.getCause().getMessage().contains("Permission denied"));
throw e;
} finally {
public void testArchiveOnTableDelete() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());

View File

@ -18,22 +18,30 @@
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,6 +55,9 @@ public class TestMasterFileSystem {
public static final HBaseClassTestRule CLASS_RULE =
public TestName name = new TestName();
private static final Logger LOG = LoggerFactory.getLogger(TestMasterFileSystem.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -73,4 +84,56 @@ public class TestMasterFileSystem {
// make sure the set uri matches by forcing it.
assertEquals(masterRoot, rootDir);
public void testCheckTempDir() throws Exception {
final MasterFileSystem masterFileSystem =
final TableName tableName = TableName.valueOf(name.getMethodName());
final byte[] FAM = Bytes.toBytes("fam");
final byte[][] splitKeys = new byte[][] {
Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d")
UTIL.createTable(tableName, FAM, splitKeys);
// get the current store files for the regions
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(tableName);
// make sure we have 4 regions serving this table
assertEquals(4, regions.size());
// load the table
try (Table table = UTIL.getConnection().getTable(tableName)) {
UTIL.loadTable(table, FAM);
// disable the table so that we can manipulate the files
final Path tableDir = FSUtils.getTableDir(masterFileSystem.getRootDir(), tableName);
final Path tempDir = masterFileSystem.getTempDir();
final Path tempTableDir = FSUtils.getTableDir(tempDir, tableName);
final FileSystem fs = masterFileSystem.getFileSystem();
// move the table to the temporary directory
if (!fs.rename(tableDir, tempTableDir)) {
masterFileSystem.checkTempDir(tempDir, UTIL.getConfiguration(), fs);
// check if the temporary directory exists and is empty
assertEquals(0, fs.listStatus(tempDir).length);
// check for the existence of the archive directory
for (HRegion region : regions) {
Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(),

View File

@ -17,19 +17,32 @@
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@ -38,6 +51,7 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({MasterTests.class, MediumTests.class})
public class TestDeleteTableProcedure extends TestTableDDLProcedureBase {
@ -155,4 +169,59 @@ public class TestDeleteTableProcedure extends TestTableDDLProcedureBase {
MasterProcedureTestingUtility.validateTableDeletion(getMaster(), tableName);
public void testDeleteWhenTempDirIsNotEmpty() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final String FAM = "fam";
final byte[][] splitKeys = new byte[][] {
Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d")
// create the table
getMasterProcedureExecutor(), tableName, splitKeys, FAM);
// get the current store files for the regions
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(tableName);
// make sure we have 4 regions serving this table
assertEquals(4, regions.size());
// load the table
try (Table table = UTIL.getConnection().getTable(tableName)) {
UTIL.loadTable(table, Bytes.toBytes(FAM));
// disable the table so that we can manipulate the files
final MasterFileSystem masterFileSystem =
final Path tableDir = FSUtils.getTableDir(masterFileSystem.getRootDir(), tableName);
final Path tempDir = masterFileSystem.getTempDir();
final Path tempTableDir = FSUtils.getTableDir(tempDir, tableName);
final FileSystem fs = masterFileSystem.getFileSystem();
// copy the table to the temporary directory to make sure the temp directory is not empty
if (!FileUtil.copy(fs, tableDir, fs, tempTableDir, false, UTIL.getConfiguration())) {
// delete the table
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
long procId = ProcedureTestingUtility.submitAndWait(procExec,
new DeleteTableProcedure(procExec.getEnvironment(), tableName));
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
MasterProcedureTestingUtility.validateTableDeletion(getMaster(), tableName);
// check if the temporary directory is deleted
// check for the existence of the archive directory
for (HRegion region : regions) {
Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(),