HBASE-26342 Support custom paths of independent configuration and pool for hfile cleaner (#4403)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Xiaolin Ha 2022-05-24 09:53:48 +08:00 committed by GitHub
parent 623f8affe2
commit bc21967946
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 287 additions and 33 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY; import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
import com.google.errorprone.annotations.RestrictedApi; import com.google.errorprone.annotations.RestrictedApi;
@ -37,6 +38,7 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -78,6 +80,7 @@ import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.PleaseRestartMasterException; import org.apache.hadoop.hbase.PleaseRestartMasterException;
import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerTask; import org.apache.hadoop.hbase.ServerTask;
@ -378,12 +381,18 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private HbckChore hbckChore; private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore; CatalogJanitor catalogJanitorChore;
// Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner // Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool; private DirScanPool logCleanerPool;
private LogCleaner logCleaner; private LogCleaner logCleaner;
private HFileCleaner hfileCleaner; // HFile cleaners for the custom hfile archive paths and the default archive path
// The archive path cleaner is the first element
private List<HFileCleaner> hfileCleaners = new ArrayList<>();
// The hfile cleaner paths, including custom paths and the default archive path
private List<Path> hfileCleanerPaths = new ArrayList<>();
// The shared hfile cleaner pool for the custom archive paths
private DirScanPool sharedHFileCleanerPool;
// The exclusive hfile cleaner pool for scanning the archive directory
private DirScanPool exclusiveHFileCleanerPool;
private ReplicationBarrierCleaner replicationBarrierCleaner; private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore; private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore; private MobFileCompactionChore mobFileCompactionChore;
@ -1158,11 +1167,18 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f)); (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime(); this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool); configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner); configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager); configurationManager.registerObserver(this.regionsRecoveryConfigManager);
configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
if (this.sharedHFileCleanerPool != null) {
configurationManager.registerObserver(this.sharedHFileCleanerPool);
}
if (this.hfileCleaners != null) {
for (HFileCleaner cleaner : hfileCleaners) {
configurationManager.registerObserver(cleaner);
}
}
// Set master as 'initialized'. // Set master as 'initialized'.
setInitialized(true); setInitialized(true);
@ -1439,8 +1455,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
boolean isCleanerChoreEnabled() { boolean isCleanerChoreEnabled() {
boolean hfileCleanerFlag = true, logCleanerFlag = true; boolean hfileCleanerFlag = true, logCleanerFlag = true;
if (hfileCleaner != null) { if (getHFileCleaner() != null) {
hfileCleanerFlag = hfileCleaner.getEnabled(); hfileCleanerFlag = getHFileCleaner().getEnabled();
} }
if (logCleaner != null) { if (logCleaner != null) {
@ -1539,13 +1555,47 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
getMasterWalManager().getOldLogDir(), logCleanerPool, params); getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner); getChoreService().scheduleChore(logCleaner);
// start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf); // Create custom archive hfile cleaners
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params); // todo: handle the overlap issues for the custom paths
getChoreService().scheduleChore(hfileCleaner);
if (paths != null && paths.length > 0) {
if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
Set<String> cleanerClasses = new HashSet<>();
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
if (cleaners != null) {
Collections.addAll(cleanerClasses, cleaners);
}
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
cleanerClasses.toArray(new String[cleanerClasses.size()]));
LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths),
cleanerClasses);
}
// share the hfile cleaner pool in custom paths
sharedHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6"));
for (int i = 0; i < paths.length; i++) {
Path path = new Path(paths[i].trim());
HFileCleaner cleaner =
new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(), cleanerInterval, this,
conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, sharedHFileCleanerPool, params, null);
hfileCleaners.add(cleaner);
hfileCleanerPaths.add(path);
}
}
// Create the whole archive dir cleaner thread pool
exclusiveHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
hfileCleaners.add(0,
new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
archiveDir, exclusiveHFileCleanerPool, params, hfileCleanerPaths));
hfileCleanerPaths.add(0, archiveDir);
// Schedule all the hfile cleaners
for (HFileCleaner hFileCleaner : hfileCleaners) {
getChoreService().scheduleChore(hFileCleaner);
}
// Regions Reopen based on very high storeFileRefCount is considered enabled // Regions Reopen based on very high storeFileRefCount is considered enabled
// only if hbase.regions.recovery.store.file.ref.count has value > 0 // only if hbase.regions.recovery.store.file.ref.count has value > 0
@ -1593,14 +1643,18 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
} }
stopChoreService(); stopChoreService();
stopExecutorService(); stopExecutorService();
if (hfileCleanerPool != null) { if (exclusiveHFileCleanerPool != null) {
hfileCleanerPool.shutdownNow(); exclusiveHFileCleanerPool.shutdownNow();
hfileCleanerPool = null; exclusiveHFileCleanerPool = null;
} }
if (logCleanerPool != null) { if (logCleanerPool != null) {
logCleanerPool.shutdownNow(); logCleanerPool.shutdownNow();
logCleanerPool = null; logCleanerPool = null;
} }
if (sharedHFileCleanerPool != null) {
sharedHFileCleanerPool.shutdownNow();
sharedHFileCleanerPool = null;
}
if (maintenanceRegionServer != null) { if (maintenanceRegionServer != null) {
maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL); maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
} }
@ -1735,7 +1789,12 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
shutdownChore(clusterStatusPublisherChore); shutdownChore(clusterStatusPublisherChore);
shutdownChore(snapshotQuotaChore); shutdownChore(snapshotQuotaChore);
shutdownChore(logCleaner); shutdownChore(logCleaner);
shutdownChore(hfileCleaner); if (hfileCleaners != null) {
for (ScheduledChore chore : hfileCleaners) {
chore.shutdown();
}
hfileCleaners = null;
}
shutdownChore(replicationBarrierCleaner); shutdownChore(replicationBarrierCleaner);
shutdownChore(snapshotCleanerChore); shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore); shutdownChore(hbckChore);
@ -3210,7 +3269,11 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
} }
public HFileCleaner getHFileCleaner() { public HFileCleaner getHFileCleaner() {
return this.hfileCleaner; return this.hfileCleaners.get(0);
}
public List<HFileCleaner> getHFileCleaners() {
return this.hfileCleaners;
} }
public LogCleaner getLogCleaner() { public LogCleaner getLogCleaner() {

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.janitor.MetaFixer; import org.apache.hadoop.hbase.master.janitor.MetaFixer;
import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -874,7 +875,9 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
boolean prevValue = boolean prevValue =
server.getLogCleaner().getEnabled() && server.getHFileCleaner().getEnabled(); server.getLogCleaner().getEnabled() && server.getHFileCleaner().getEnabled();
server.getLogCleaner().setEnabled(req.getOn()); server.getLogCleaner().setEnabled(req.getOn());
server.getHFileCleaner().setEnabled(req.getOn()); for (HFileCleaner hFileCleaner : server.getHFileCleaners()) {
hFileCleaner.setEnabled(req.getOn());
}
return SetCleanerChoreRunningResponse.newBuilder().setPrevValue(prevValue).build(); return SetCleanerChoreRunningResponse.newBuilder().setPrevValue(prevValue).build();
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.master.cleaner; package org.apache.hadoop.hbase.master.cleaner;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -79,10 +80,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
protected final Map<String, Object> params; protected final Map<String, Object> params;
private final AtomicBoolean enabled = new AtomicBoolean(true); private final AtomicBoolean enabled = new AtomicBoolean(true);
protected List<T> cleanersChain; protected List<T> cleanersChain;
protected List<String> excludeDirs;
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) { FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null); this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
} }
/** /**
@ -97,7 +99,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* @param params members could be used in cleaner * @param params members could be used in cleaner
*/ */
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) { FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
List<Path> excludePaths) {
super(name, s, sleepPeriod); super(name, s, sleepPeriod);
Preconditions.checkNotNull(pool, "Chore's pool can not be null"); Preconditions.checkNotNull(pool, "Chore's pool can not be null");
@ -106,6 +109,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
this.oldFileDir = oldFileDir; this.oldFileDir = oldFileDir;
this.conf = conf; this.conf = conf;
this.params = params; this.params = params;
if (excludePaths != null && !excludePaths.isEmpty()) {
excludeDirs = new ArrayList<>(excludePaths.size());
for (Path path : excludePaths) {
StringBuilder dirPart = new StringBuilder(path.toString());
if (!path.toString().endsWith("/")) {
dirPart.append("/");
}
excludeDirs.add(dirPart.toString());
}
}
if (excludeDirs != null) {
LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs);
}
initCleanerChain(confKey); initCleanerChain(confKey);
} }
@ -419,9 +435,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
sortByConsumedSpace(subDirs); sortByConsumedSpace(subDirs);
// Submit the request of sub-directory deletion. // Submit the request of sub-directory deletion.
subDirs.forEach(subDir -> { subDirs.forEach(subDir -> {
if (!shouldExclude(subDir)) {
CompletableFuture<Boolean> subFuture = new CompletableFuture<>(); CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture)); pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
futures.add(subFuture); futures.add(subFuture);
}
}); });
} }
@ -451,11 +469,34 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
} }
}); });
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Failed to traverse and delete the path: {}", dir, e); if (e instanceof FileNotFoundException) {
LOG.debug("Dir dose not exist, {}", dir);
} else {
LOG.error("Failed to traverse and delete the path: {}", dir, e);
}
result.completeExceptionally(e); result.completeExceptionally(e);
} }
} }
/**
* Check if a path should not perform clear
*/
private boolean shouldExclude(FileStatus f) {
if (!f.isDirectory()) {
return false;
}
if (excludeDirs != null && !excludeDirs.isEmpty()) {
for (String dirPart : excludeDirs) {
// since we make excludeDirs end with '/',
// if a path contains() the dirPart, the path should be excluded
if (f.getPath().toString().contains(dirPart)) {
return true;
}
}
}
return false;
}
/** /**
* Perform a delete on a specified type. * Perform a delete on a specified type.
* @param deletion a delete * @param deletion a delete

View File

@ -57,10 +57,13 @@ public class DirScanPool implements ConfigurationObserver {
} }
private DirScanPool(Configuration conf, Type dirScanPoolType) { private DirScanPool(Configuration conf, Type dirScanPoolType) {
this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault));
}
private DirScanPool(Type dirScanPoolType, String poolSize) {
this.dirScanPoolType = dirScanPoolType; this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase(); this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize); size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration, // poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure. // double check to make sure.
@ -143,6 +146,10 @@ public class DirScanPool implements ConfigurationObserver {
return new DirScanPool(conf, Type.HFILE_CLEANER); return new DirScanPool(conf, Type.HFILE_CLEANER);
} }
public static DirScanPool getHFileCleanerScanPool(String poolSize) {
return new DirScanPool(Type.HFILE_CLEANER, poolSize);
}
public static DirScanPool getLogCleanerScanPool(Configuration conf) { public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER); return new DirScanPool(conf, Type.LOG_CLEANER);
} }

View File

@ -88,6 +88,17 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
"hbase.regionserver.hfilecleaner.thread.check.interval.msec"; "hbase.regionserver.hfilecleaner.thread.check.interval.msec";
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
/**
* The custom paths for hfile cleaner, subdirectories of archive, e.g.
* data/default/testTable1,data/default/testTable2
*/
public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths";
/** Configure hfile cleaner classes for the custom paths */
public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
"hbase.master.hfilecleaner.custom.paths.plugins";
public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size";
private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
StealJobQueue<HFileDeleteTask> largeFileQueue; StealJobQueue<HFileDeleteTask> largeFileQueue;
@ -117,8 +128,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params) { Path directory, DirScanPool pool, Map<String, Object> params) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool, this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params); params, null);
}
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map<String, Object> params, List<Path> excludePaths) {
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params, excludePaths);
} }
/** /**
@ -134,8 +150,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
* @param params params could be used in subclass of BaseHFileCleanerDelegate * @param params params could be used in subclass of BaseHFileCleanerDelegate
*/ */
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs, public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) { Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
super(name, period, stopper, conf, fs, directory, confKey, pool, params); List<Path> excludePaths) {
super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
throttlePoint = throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize = largeQueueInitSize =

View File

@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs, public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path oldLogDir, DirScanPool pool, Map<String, Object> params) { Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS, super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
pool, params); pool, params, null);
this.pendingDelete = new LinkedBlockingQueue<>(); this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size); this.oldWALsCleaner = createOldWalsCleaner(size);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
@ -63,7 +64,7 @@ public class TestMasterChoreScheduled {
} }
@Test @Test
public void testDefaultScheduledChores() { public void testDefaultScheduledChores() throws Exception {
// test if logCleaner chore is scheduled by default in HMaster init // test if logCleaner chore is scheduled by default in HMaster init
TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>(); TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner"); LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
@ -71,7 +72,9 @@ public class TestMasterChoreScheduled {
// test if hfileCleaner chore is scheduled by default in HMaster init // test if hfileCleaner chore is scheduled by default in HMaster init
TestChoreField<HFileCleaner> hFileCleanerTestChoreField = new TestChoreField<>(); TestChoreField<HFileCleaner> hFileCleanerTestChoreField = new TestChoreField<>();
HFileCleaner hFileCleaner = hFileCleanerTestChoreField.getChoreObj("hfileCleaner"); Field masterField = HMaster.class.getDeclaredField("hfileCleaners");
masterField.setAccessible(true);
HFileCleaner hFileCleaner = ((ArrayList<HFileCleaner>) masterField.get(hMaster)).get(0);
hFileCleanerTestChoreField.testIfChoreScheduled(hFileCleaner); hFileCleanerTestChoreField.testIfChoreScheduled(hFileCleaner);
// test if replicationBarrierCleaner chore is scheduled by default in HMaster init // test if replicationBarrierCleaner chore is scheduled by default in HMaster init

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.apache.hadoop.hbase.master.HMaster.HBASE_MASTER_CLEANER_INTERVAL;
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category(LargeTests.class)
public class TestCleanerClearHFiles {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCleanerClearHFiles.class);
@Rule
public TestName name = new TestName();
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static Configuration conf = TEST_UTIL.getConfiguration();
private static Admin admin = null;
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
private static final String TABLE1 = "table1";
private static final String TABLE2 = "table2";
private static final String DEFAULT_ARCHIVE_SUBDIRS_PREFIX = "data/default/";
@BeforeClass
public static void setupBeforeClass() throws Exception {
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS,
DEFAULT_ARCHIVE_SUBDIRS_PREFIX + TABLE1);
conf.setStrings(HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, HFileLinkCleaner.class.getName());
conf.setInt(TimeToLiveHFileCleaner.TTL_CONF_KEY, 10);
conf.setInt(HBASE_MASTER_CLEANER_INTERVAL, 20000);
TEST_UTIL.startMiniCluster();
admin = TEST_UTIL.getAdmin();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testClearArchive() throws Exception {
DistributedFileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
Table table1 = createTable(TEST_UTIL, TableName.valueOf(TABLE1));
Table table2 = createTable(TEST_UTIL, TableName.valueOf(TABLE2));
admin.disableTable(table1.getName());
admin.deleteTable(table1.getName());
admin.disableTable(table2.getName());
admin.deleteTable(table2.getName());
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Path archiveTable1Path = new Path(archiveDir, DEFAULT_ARCHIVE_SUBDIRS_PREFIX + TABLE1);
Path archiveTable2Path = new Path(archiveDir, DEFAULT_ARCHIVE_SUBDIRS_PREFIX + TABLE2);
TEST_UTIL.waitFor(10000, () -> !notExistOrEmptyDir(archiveTable1Path, fs)
&& !notExistOrEmptyDir(archiveTable2Path, fs));
TEST_UTIL.waitFor(30000,
() -> notExistOrEmptyDir(archiveTable1Path, fs) && notExistOrEmptyDir(archiveTable2Path, fs));
}
private boolean notExistOrEmptyDir(Path dir, DistributedFileSystem fs) {
try {
return fs.listStatus(dir).length == 0;
} catch (Exception e) {
return e instanceof FileNotFoundException;
}
}
private Table createTable(HBaseTestingUtil util, TableName tableName) throws IOException {
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).build()).build();
return util.createTable(td, null);
}
}