HBASE-26342 Support custom paths of independent configuration and pool for hfile cleaner (#4403) (#4461)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
4be25a3a61
commit
e1843af1d5
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
|
||||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
|
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
|
||||||
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
|
||||||
|
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.CUSTOM_POOL_SIZE;
|
||||||
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
|
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
|
||||||
|
|
||||||
import com.google.errorprone.annotations.RestrictedApi;
|
import com.google.errorprone.annotations.RestrictedApi;
|
||||||
|
@ -39,6 +40,7 @@ import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -77,6 +79,7 @@ import org.apache.hadoop.hbase.PleaseHoldException;
|
||||||
import org.apache.hadoop.hbase.PleaseRestartMasterException;
|
import org.apache.hadoop.hbase.PleaseRestartMasterException;
|
||||||
import org.apache.hadoop.hbase.RegionMetrics;
|
import org.apache.hadoop.hbase.RegionMetrics;
|
||||||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
import org.apache.hadoop.hbase.ServerMetrics;
|
import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.ServerTask;
|
import org.apache.hadoop.hbase.ServerTask;
|
||||||
|
@ -357,12 +360,18 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
|
|
||||||
private HbckChore hbckChore;
|
private HbckChore hbckChore;
|
||||||
CatalogJanitor catalogJanitorChore;
|
CatalogJanitor catalogJanitorChore;
|
||||||
// Threadpool for scanning the archive directory, used by the HFileCleaner
|
|
||||||
private DirScanPool hfileCleanerPool;
|
|
||||||
// Threadpool for scanning the Old logs directory, used by the LogCleaner
|
// Threadpool for scanning the Old logs directory, used by the LogCleaner
|
||||||
private DirScanPool logCleanerPool;
|
private DirScanPool logCleanerPool;
|
||||||
private LogCleaner logCleaner;
|
private LogCleaner logCleaner;
|
||||||
private HFileCleaner hfileCleaner;
|
// HFile cleaners for the custom hfile archive paths and the default archive path
|
||||||
|
// The archive path cleaner is the first element
|
||||||
|
private List<HFileCleaner> hfileCleaners = new ArrayList<>();
|
||||||
|
// The hfile cleaner paths, including custom paths and the default archive path
|
||||||
|
private List<Path> hfileCleanerPaths = new ArrayList<>();
|
||||||
|
// The shared hfile cleaner pool for the custom archive paths
|
||||||
|
private DirScanPool sharedHFileCleanerPool;
|
||||||
|
// The exclusive hfile cleaner pool for scanning the archive directory
|
||||||
|
private DirScanPool exclusiveHFileCleanerPool;
|
||||||
private ReplicationBarrierCleaner replicationBarrierCleaner;
|
private ReplicationBarrierCleaner replicationBarrierCleaner;
|
||||||
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
||||||
private MobCompactionChore mobCompactChore;
|
private MobCompactionChore mobCompactChore;
|
||||||
|
@ -1154,11 +1163,18 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
|
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
|
||||||
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
|
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
|
||||||
configurationManager.registerObserver(this.balancer);
|
configurationManager.registerObserver(this.balancer);
|
||||||
configurationManager.registerObserver(this.hfileCleanerPool);
|
|
||||||
configurationManager.registerObserver(this.logCleanerPool);
|
configurationManager.registerObserver(this.logCleanerPool);
|
||||||
configurationManager.registerObserver(this.hfileCleaner);
|
|
||||||
configurationManager.registerObserver(this.logCleaner);
|
configurationManager.registerObserver(this.logCleaner);
|
||||||
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
|
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
|
||||||
|
configurationManager.registerObserver(this.exclusiveHFileCleanerPool);
|
||||||
|
if (this.sharedHFileCleanerPool != null) {
|
||||||
|
configurationManager.registerObserver(this.sharedHFileCleanerPool);
|
||||||
|
}
|
||||||
|
if (this.hfileCleaners != null) {
|
||||||
|
for (HFileCleaner cleaner : hfileCleaners) {
|
||||||
|
configurationManager.registerObserver(cleaner);
|
||||||
|
}
|
||||||
|
}
|
||||||
// Set master as 'initialized'.
|
// Set master as 'initialized'.
|
||||||
setInitialized(true);
|
setInitialized(true);
|
||||||
|
|
||||||
|
@ -1431,8 +1447,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
boolean isCleanerChoreEnabled() {
|
boolean isCleanerChoreEnabled() {
|
||||||
boolean hfileCleanerFlag = true, logCleanerFlag = true;
|
boolean hfileCleanerFlag = true, logCleanerFlag = true;
|
||||||
|
|
||||||
if (hfileCleaner != null) {
|
if (getHFileCleaner() != null) {
|
||||||
hfileCleanerFlag = hfileCleaner.getEnabled();
|
hfileCleanerFlag = getHFileCleaner().getEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logCleaner != null) {
|
if (logCleaner != null) {
|
||||||
|
@ -1531,13 +1547,47 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
|
getMasterWalManager().getOldLogDir(), logCleanerPool, params);
|
||||||
getChoreService().scheduleChore(logCleaner);
|
getChoreService().scheduleChore(logCleaner);
|
||||||
|
|
||||||
// start the hfile archive cleaner thread
|
|
||||||
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
||||||
// Create archive cleaner thread pool
|
|
||||||
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
|
// Create custom archive hfile cleaners
|
||||||
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
|
String[] paths = conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS);
|
||||||
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
|
// todo: handle the overlap issues for the custom paths
|
||||||
getChoreService().scheduleChore(hfileCleaner);
|
|
||||||
|
if (paths != null && paths.length > 0) {
|
||||||
|
if (conf.getStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS) == null) {
|
||||||
|
Set<String> cleanerClasses = new HashSet<>();
|
||||||
|
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
|
||||||
|
if (cleaners != null) {
|
||||||
|
Collections.addAll(cleanerClasses, cleaners);
|
||||||
|
}
|
||||||
|
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS,
|
||||||
|
cleanerClasses.toArray(new String[cleanerClasses.size()]));
|
||||||
|
LOG.info("Archive custom cleaner paths: {}, plugins: {}", Arrays.asList(paths),
|
||||||
|
cleanerClasses);
|
||||||
|
}
|
||||||
|
// share the hfile cleaner pool in custom paths
|
||||||
|
sharedHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf.get(CUSTOM_POOL_SIZE, "6"));
|
||||||
|
for (int i = 0; i < paths.length; i++) {
|
||||||
|
Path path = new Path(paths[i].trim());
|
||||||
|
HFileCleaner cleaner =
|
||||||
|
new HFileCleaner("ArchiveCustomHFileCleaner-" + path.getName(), cleanerInterval, this,
|
||||||
|
conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
|
||||||
|
HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, sharedHFileCleanerPool, params, null);
|
||||||
|
hfileCleaners.add(cleaner);
|
||||||
|
hfileCleanerPaths.add(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the whole archive dir cleaner thread pool
|
||||||
|
exclusiveHFileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
|
||||||
|
hfileCleaners.add(0,
|
||||||
|
new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(),
|
||||||
|
archiveDir, exclusiveHFileCleanerPool, params, hfileCleanerPaths));
|
||||||
|
hfileCleanerPaths.add(0, archiveDir);
|
||||||
|
// Schedule all the hfile cleaners
|
||||||
|
for (HFileCleaner hFileCleaner : hfileCleaners) {
|
||||||
|
getChoreService().scheduleChore(hFileCleaner);
|
||||||
|
}
|
||||||
|
|
||||||
// Regions Reopen based on very high storeFileRefCount is considered enabled
|
// Regions Reopen based on very high storeFileRefCount is considered enabled
|
||||||
// only if hbase.regions.recovery.store.file.ref.count has value > 0
|
// only if hbase.regions.recovery.store.file.ref.count has value > 0
|
||||||
|
@ -1589,14 +1639,18 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
this.mobCompactThread.close();
|
this.mobCompactThread.close();
|
||||||
}
|
}
|
||||||
super.stopServiceThreads();
|
super.stopServiceThreads();
|
||||||
if (hfileCleanerPool != null) {
|
if (exclusiveHFileCleanerPool != null) {
|
||||||
hfileCleanerPool.shutdownNow();
|
exclusiveHFileCleanerPool.shutdownNow();
|
||||||
hfileCleanerPool = null;
|
exclusiveHFileCleanerPool = null;
|
||||||
}
|
}
|
||||||
if (logCleanerPool != null) {
|
if (logCleanerPool != null) {
|
||||||
logCleanerPool.shutdownNow();
|
logCleanerPool.shutdownNow();
|
||||||
logCleanerPool = null;
|
logCleanerPool = null;
|
||||||
}
|
}
|
||||||
|
if (sharedHFileCleanerPool != null) {
|
||||||
|
sharedHFileCleanerPool.shutdownNow();
|
||||||
|
sharedHFileCleanerPool = null;
|
||||||
|
}
|
||||||
|
|
||||||
LOG.debug("Stopping service threads");
|
LOG.debug("Stopping service threads");
|
||||||
|
|
||||||
|
@ -1730,7 +1784,12 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
shutdownChore(clusterStatusPublisherChore);
|
shutdownChore(clusterStatusPublisherChore);
|
||||||
shutdownChore(snapshotQuotaChore);
|
shutdownChore(snapshotQuotaChore);
|
||||||
shutdownChore(logCleaner);
|
shutdownChore(logCleaner);
|
||||||
shutdownChore(hfileCleaner);
|
if (hfileCleaners != null) {
|
||||||
|
for (ScheduledChore chore : hfileCleaners) {
|
||||||
|
chore.shutdown();
|
||||||
|
}
|
||||||
|
hfileCleaners = null;
|
||||||
|
}
|
||||||
shutdownChore(replicationBarrierCleaner);
|
shutdownChore(replicationBarrierCleaner);
|
||||||
shutdownChore(snapshotCleanerChore);
|
shutdownChore(snapshotCleanerChore);
|
||||||
shutdownChore(hbckChore);
|
shutdownChore(hbckChore);
|
||||||
|
@ -3220,7 +3279,11 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
public HFileCleaner getHFileCleaner() {
|
public HFileCleaner getHFileCleaner() {
|
||||||
return this.hfileCleaner;
|
return this.hfileCleaners.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<HFileCleaner> getHFileCleaners() {
|
||||||
|
return this.hfileCleaners;
|
||||||
}
|
}
|
||||||
|
|
||||||
public LogCleaner getLogCleaner() {
|
public LogCleaner getLogCleaner() {
|
||||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
|
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
@ -798,7 +799,9 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
boolean prevValue =
|
boolean prevValue =
|
||||||
master.getLogCleaner().getEnabled() && master.getHFileCleaner().getEnabled();
|
master.getLogCleaner().getEnabled() && master.getHFileCleaner().getEnabled();
|
||||||
master.getLogCleaner().setEnabled(req.getOn());
|
master.getLogCleaner().setEnabled(req.getOn());
|
||||||
master.getHFileCleaner().setEnabled(req.getOn());
|
for (HFileCleaner hFileCleaner : master.getHFileCleaners()) {
|
||||||
|
hFileCleaner.setEnabled(req.getOn());
|
||||||
|
}
|
||||||
return SetCleanerChoreRunningResponse.newBuilder().setPrevValue(prevValue).build();
|
return SetCleanerChoreRunningResponse.newBuilder().setPrevValue(prevValue).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.cleaner;
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -79,10 +80,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
protected final Map<String, Object> params;
|
protected final Map<String, Object> params;
|
||||||
private final AtomicBoolean enabled = new AtomicBoolean(true);
|
private final AtomicBoolean enabled = new AtomicBoolean(true);
|
||||||
protected List<T> cleanersChain;
|
protected List<T> cleanersChain;
|
||||||
|
protected List<String> excludeDirs;
|
||||||
|
|
||||||
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
||||||
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
|
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
|
||||||
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
|
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,7 +99,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
* @param params members could be used in cleaner
|
* @param params members could be used in cleaner
|
||||||
*/
|
*/
|
||||||
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
|
||||||
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params) {
|
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool, Map<String, Object> params,
|
||||||
|
List<Path> excludePaths) {
|
||||||
super(name, s, sleepPeriod);
|
super(name, s, sleepPeriod);
|
||||||
|
|
||||||
Preconditions.checkNotNull(pool, "Chore's pool can not be null");
|
Preconditions.checkNotNull(pool, "Chore's pool can not be null");
|
||||||
|
@ -106,6 +109,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
this.oldFileDir = oldFileDir;
|
this.oldFileDir = oldFileDir;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.params = params;
|
this.params = params;
|
||||||
|
if (excludePaths != null && !excludePaths.isEmpty()) {
|
||||||
|
excludeDirs = new ArrayList<>(excludePaths.size());
|
||||||
|
for (Path path : excludePaths) {
|
||||||
|
StringBuilder dirPart = new StringBuilder(path.toString());
|
||||||
|
if (!path.toString().endsWith("/")) {
|
||||||
|
dirPart.append("/");
|
||||||
|
}
|
||||||
|
excludeDirs.add(dirPart.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (excludeDirs != null) {
|
||||||
|
LOG.info("Cleaner {} excludes sub dirs: {}", name, excludeDirs);
|
||||||
|
}
|
||||||
initCleanerChain(confKey);
|
initCleanerChain(confKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,9 +435,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
sortByConsumedSpace(subDirs);
|
sortByConsumedSpace(subDirs);
|
||||||
// Submit the request of sub-directory deletion.
|
// Submit the request of sub-directory deletion.
|
||||||
subDirs.forEach(subDir -> {
|
subDirs.forEach(subDir -> {
|
||||||
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
|
if (!shouldExclude(subDir)) {
|
||||||
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
|
CompletableFuture<Boolean> subFuture = new CompletableFuture<>();
|
||||||
futures.add(subFuture);
|
pool.execute(() -> traverseAndDelete(subDir.getPath(), false, subFuture));
|
||||||
|
futures.add(subFuture);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,11 +469,34 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("Failed to traverse and delete the path: {}", dir, e);
|
if (e instanceof FileNotFoundException) {
|
||||||
|
LOG.debug("Dir dose not exist, {}", dir);
|
||||||
|
} else {
|
||||||
|
LOG.error("Failed to traverse and delete the path: {}", dir, e);
|
||||||
|
}
|
||||||
result.completeExceptionally(e);
|
result.completeExceptionally(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a path should not perform clear
|
||||||
|
*/
|
||||||
|
private boolean shouldExclude(FileStatus f) {
|
||||||
|
if (!f.isDirectory()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (excludeDirs != null && !excludeDirs.isEmpty()) {
|
||||||
|
for (String dirPart : excludeDirs) {
|
||||||
|
// since we make excludeDirs end with '/',
|
||||||
|
// if a path contains() the dirPart, the path should be excluded
|
||||||
|
if (f.getPath().toString().contains(dirPart)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a delete on a specified type.
|
* Perform a delete on a specified type.
|
||||||
* @param deletion a delete
|
* @param deletion a delete
|
||||||
|
|
|
@ -57,10 +57,13 @@ public class DirScanPool implements ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
private DirScanPool(Configuration conf, Type dirScanPoolType) {
|
private DirScanPool(Configuration conf, Type dirScanPoolType) {
|
||||||
|
this(dirScanPoolType, conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
|
||||||
|
dirScanPoolType.cleanerPoolSizeConfigDefault));
|
||||||
|
}
|
||||||
|
|
||||||
|
private DirScanPool(Type dirScanPoolType, String poolSize) {
|
||||||
this.dirScanPoolType = dirScanPoolType;
|
this.dirScanPoolType = dirScanPoolType;
|
||||||
this.name = dirScanPoolType.name().toLowerCase();
|
this.name = dirScanPoolType.name().toLowerCase();
|
||||||
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
|
|
||||||
dirScanPoolType.cleanerPoolSizeConfigDefault);
|
|
||||||
size = CleanerChore.calculatePoolSize(poolSize);
|
size = CleanerChore.calculatePoolSize(poolSize);
|
||||||
// poolSize may be 0 or 0.0 from a careless configuration,
|
// poolSize may be 0 or 0.0 from a careless configuration,
|
||||||
// double check to make sure.
|
// double check to make sure.
|
||||||
|
@ -143,6 +146,10 @@ public class DirScanPool implements ConfigurationObserver {
|
||||||
return new DirScanPool(conf, Type.HFILE_CLEANER);
|
return new DirScanPool(conf, Type.HFILE_CLEANER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static DirScanPool getHFileCleanerScanPool(String poolSize) {
|
||||||
|
return new DirScanPool(Type.HFILE_CLEANER, poolSize);
|
||||||
|
}
|
||||||
|
|
||||||
public static DirScanPool getLogCleanerScanPool(Configuration conf) {
|
public static DirScanPool getLogCleanerScanPool(Configuration conf) {
|
||||||
return new DirScanPool(conf, Type.LOG_CLEANER);
|
return new DirScanPool(conf, Type.LOG_CLEANER);
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,6 +88,17 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
|
||||||
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
|
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
|
||||||
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
|
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The custom paths for hfile cleaner, subdirectories of archive, e.g.
|
||||||
|
* data/default/testTable1,data/default/testTable2
|
||||||
|
*/
|
||||||
|
public static final String HFILE_CLEANER_CUSTOM_PATHS = "hbase.master.hfile.cleaner.custom.paths";
|
||||||
|
|
||||||
|
/** Configure hfile cleaner classes for the custom paths */
|
||||||
|
public static final String HFILE_CLEANER_CUSTOM_PATHS_PLUGINS =
|
||||||
|
"hbase.master.hfilecleaner.custom.paths.plugins";
|
||||||
|
public static final String CUSTOM_POOL_SIZE = "hbase.cleaner.custom.hfiles.pool.size";
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
|
private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
|
||||||
|
|
||||||
StealJobQueue<HFileDeleteTask> largeFileQueue;
|
StealJobQueue<HFileDeleteTask> largeFileQueue;
|
||||||
|
@ -117,8 +128,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
|
||||||
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
||||||
Path directory, DirScanPool pool, Map<String, Object> params) {
|
Path directory, DirScanPool pool, Map<String, Object> params) {
|
||||||
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
|
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
|
||||||
params);
|
params, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
||||||
|
Path directory, DirScanPool pool, Map<String, Object> params, List<Path> excludePaths) {
|
||||||
|
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
|
||||||
|
params, excludePaths);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -134,8 +150,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
|
||||||
* @param params params could be used in subclass of BaseHFileCleanerDelegate
|
* @param params params could be used in subclass of BaseHFileCleanerDelegate
|
||||||
*/
|
*/
|
||||||
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
|
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
|
||||||
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
|
Path directory, String confKey, DirScanPool pool, Map<String, Object> params,
|
||||||
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
|
List<Path> excludePaths) {
|
||||||
|
super(name, period, stopper, conf, fs, directory, confKey, pool, params, excludePaths);
|
||||||
throttlePoint =
|
throttlePoint =
|
||||||
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
|
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
|
||||||
largeQueueInitSize =
|
largeQueueInitSize =
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
|
||||||
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
||||||
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
|
Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
|
||||||
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
|
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS,
|
||||||
pool, params);
|
pool, params, null);
|
||||||
this.pendingDelete = new LinkedBlockingQueue<>();
|
this.pendingDelete = new LinkedBlockingQueue<>();
|
||||||
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
|
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
|
||||||
this.oldWALsCleaner = createOldWalsCleaner(size);
|
this.oldWALsCleaner = createOldWalsCleaner(size);
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.master;
|
package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.ArrayList;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.ScheduledChore;
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
|
@ -63,7 +64,7 @@ public class TestMasterChoreScheduled {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultScheduledChores() {
|
public void testDefaultScheduledChores() throws Exception {
|
||||||
// test if logCleaner chore is scheduled by default in HMaster init
|
// test if logCleaner chore is scheduled by default in HMaster init
|
||||||
TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
|
TestChoreField<LogCleaner> logCleanerTestChoreField = new TestChoreField<>();
|
||||||
LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
|
LogCleaner logCleaner = logCleanerTestChoreField.getChoreObj("logCleaner");
|
||||||
|
@ -71,7 +72,9 @@ public class TestMasterChoreScheduled {
|
||||||
|
|
||||||
// test if hfileCleaner chore is scheduled by default in HMaster init
|
// test if hfileCleaner chore is scheduled by default in HMaster init
|
||||||
TestChoreField<HFileCleaner> hFileCleanerTestChoreField = new TestChoreField<>();
|
TestChoreField<HFileCleaner> hFileCleanerTestChoreField = new TestChoreField<>();
|
||||||
HFileCleaner hFileCleaner = hFileCleanerTestChoreField.getChoreObj("hfileCleaner");
|
Field masterField = HMaster.class.getDeclaredField("hfileCleaners");
|
||||||
|
masterField.setAccessible(true);
|
||||||
|
HFileCleaner hFileCleaner = ((ArrayList<HFileCleaner>) masterField.get(hMaster)).get(0);
|
||||||
hFileCleanerTestChoreField.testIfChoreScheduled(hFileCleaner);
|
hFileCleanerTestChoreField.testIfChoreScheduled(hFileCleaner);
|
||||||
|
|
||||||
// test if replicationBarrierCleaner chore is scheduled by default in HMaster init
|
// test if replicationBarrierCleaner chore is scheduled by default in HMaster init
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.cleaner;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.master.HMaster.HBASE_MASTER_CLEANER_INTERVAL;
|
||||||
|
import static org.apache.hadoop.hbase.master.cleaner.HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS_PLUGINS;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category(LargeTests.class)
|
||||||
|
public class TestCleanerClearHFiles {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestCleanerClearHFiles.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
private static Admin admin = null;
|
||||||
|
|
||||||
|
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
|
||||||
|
|
||||||
|
private static final String TABLE1 = "table1";
|
||||||
|
private static final String TABLE2 = "table2";
|
||||||
|
private static final String DEFAULT_ARCHIVE_SUBDIRS_PREFIX = "data/default/";
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
conf.setStrings(HFileCleaner.HFILE_CLEANER_CUSTOM_PATHS,
|
||||||
|
DEFAULT_ARCHIVE_SUBDIRS_PREFIX + TABLE1);
|
||||||
|
conf.setStrings(HFILE_CLEANER_CUSTOM_PATHS_PLUGINS, HFileLinkCleaner.class.getName());
|
||||||
|
|
||||||
|
conf.setInt(TimeToLiveHFileCleaner.TTL_CONF_KEY, 10);
|
||||||
|
conf.setInt(HBASE_MASTER_CLEANER_INTERVAL, 20000);
|
||||||
|
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
admin = TEST_UTIL.getAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearArchive() throws Exception {
|
||||||
|
DistributedFileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
Table table1 = createTable(TEST_UTIL, TableName.valueOf(TABLE1));
|
||||||
|
Table table2 = createTable(TEST_UTIL, TableName.valueOf(TABLE2));
|
||||||
|
|
||||||
|
admin.disableTable(table1.getName());
|
||||||
|
admin.deleteTable(table1.getName());
|
||||||
|
admin.disableTable(table2.getName());
|
||||||
|
admin.deleteTable(table2.getName());
|
||||||
|
|
||||||
|
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
||||||
|
Path archiveTable1Path = new Path(archiveDir, DEFAULT_ARCHIVE_SUBDIRS_PREFIX + TABLE1);
|
||||||
|
Path archiveTable2Path = new Path(archiveDir, DEFAULT_ARCHIVE_SUBDIRS_PREFIX + TABLE2);
|
||||||
|
|
||||||
|
TEST_UTIL.waitFor(10000, () -> !notExistOrEmptyDir(archiveTable1Path, fs)
|
||||||
|
&& !notExistOrEmptyDir(archiveTable2Path, fs));
|
||||||
|
|
||||||
|
TEST_UTIL.waitFor(30000,
|
||||||
|
() -> notExistOrEmptyDir(archiveTable1Path, fs) && notExistOrEmptyDir(archiveTable2Path, fs));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean notExistOrEmptyDir(Path dir, DistributedFileSystem fs) {
|
||||||
|
try {
|
||||||
|
return fs.listStatus(dir).length == 0;
|
||||||
|
} catch (Exception e) {
|
||||||
|
return e instanceof FileNotFoundException;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Table createTable(HBaseTestingUtility util, TableName tableName) throws IOException {
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).build()).build();
|
||||||
|
return util.createTable(td, null);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue