HBASE-26320 Implement a separate thread pool for the LogCleaner (#3712)
This avoids starvation when the archive directory is large and takes a long time to iterate through. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org> Signed-off-by: Pankaj <pankajkumar@apache.org>
This commit is contained in:
parent
cadac1889a
commit
23e7aa5ccc
|
@ -367,7 +367,10 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
|
|||
|
||||
private HbckChore hbckChore;
|
||||
CatalogJanitor catalogJanitorChore;
|
||||
private DirScanPool cleanerPool;
|
||||
// Threadpool for scanning the archive directory, used by the HFileCleaner
|
||||
private DirScanPool hfileCleanerPool;
|
||||
// Threadpool for scanning the Old logs directory, used by the LogCleaner
|
||||
private DirScanPool logCleanerPool;
|
||||
private LogCleaner logCleaner;
|
||||
private HFileCleaner hfileCleaner;
|
||||
private ReplicationBarrierCleaner replicationBarrierCleaner;
|
||||
|
@ -1131,7 +1134,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
|
|||
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
|
||||
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
|
||||
configurationManager.registerObserver(this.balancer);
|
||||
configurationManager.registerObserver(this.cleanerPool);
|
||||
configurationManager.registerObserver(this.hfileCleanerPool);
|
||||
configurationManager.registerObserver(this.logCleanerPool);
|
||||
configurationManager.registerObserver(this.hfileCleaner);
|
||||
configurationManager.registerObserver(this.logCleaner);
|
||||
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
|
||||
|
@ -1493,21 +1497,24 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
|
|||
ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
|
||||
startProcedureExecutor();
|
||||
|
||||
// Create cleaner thread pool
|
||||
cleanerPool = new DirScanPool(conf);
|
||||
// Create log cleaner thread pool
|
||||
logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(MASTER, this);
|
||||
// Start log cleaner thread
|
||||
int cleanerInterval =
|
||||
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
|
||||
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
|
||||
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params);
|
||||
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(),
|
||||
logCleanerPool, params);
|
||||
getChoreService().scheduleChore(logCleaner);
|
||||
|
||||
// start the hfile archive cleaner thread
|
||||
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
||||
// Create archive cleaner thread pool
|
||||
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
|
||||
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
|
||||
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
|
||||
getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
|
||||
getChoreService().scheduleChore(hfileCleaner);
|
||||
|
||||
// Regions Reopen based on very high storeFileRefCount is considered enabled
|
||||
|
@ -1557,9 +1564,13 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
|
|||
}
|
||||
stopChoreService();
|
||||
stopExecutorService();
|
||||
if (cleanerPool != null) {
|
||||
cleanerPool.shutdownNow();
|
||||
cleanerPool = null;
|
||||
if (hfileCleanerPool != null) {
|
||||
hfileCleanerPool.shutdownNow();
|
||||
hfileCleanerPool = null;
|
||||
}
|
||||
if (logCleanerPool != null) {
|
||||
logCleanerPool.shutdownNow();
|
||||
logCleanerPool = null;
|
||||
}
|
||||
if (maintenanceRegionServer != null) {
|
||||
maintenanceRegionServer.getRegionServer().stop(HBASE_MASTER_CLEANER_INTERVAL);
|
||||
|
|
|
@ -57,6 +57,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
/**
|
||||
* Configures the threadpool used for scanning the archive directory for the HFileCleaner
|
||||
* If it is an integer and >= 1, it would be the size;
|
||||
* if 0.0 < size <= 1.0, size would be available processors * size.
|
||||
* Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
|
||||
|
@ -64,6 +65,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
*/
|
||||
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
|
||||
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
|
||||
/**
|
||||
* Configures the threadpool used for scanning the Old logs directory for the LogCleaner
|
||||
* Follows the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
|
||||
*/
|
||||
public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
|
||||
static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";
|
||||
|
||||
private final DirScanPool pool;
|
||||
|
||||
|
|
|
@ -39,21 +39,41 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
private final ThreadPoolExecutor pool;
|
||||
private int cleanerLatch;
|
||||
private boolean reconfigNotification;
|
||||
private Type dirScanPoolType;
|
||||
private final String name;
|
||||
|
||||
public DirScanPool(Configuration conf) {
|
||||
String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
|
||||
private enum Type {
|
||||
LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE,
|
||||
CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE),
|
||||
HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
|
||||
|
||||
private final String cleanerPoolSizeConfigName;
|
||||
private final String cleanerPoolSizeConfigDefault;
|
||||
|
||||
private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) {
|
||||
this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName;
|
||||
this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault;
|
||||
}
|
||||
}
|
||||
|
||||
private DirScanPool(Configuration conf, Type dirScanPoolType) {
|
||||
this.dirScanPoolType = dirScanPoolType;
|
||||
this.name = dirScanPoolType.name().toLowerCase();
|
||||
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
|
||||
dirScanPoolType.cleanerPoolSizeConfigDefault);
|
||||
size = CleanerChore.calculatePoolSize(poolSize);
|
||||
// poolSize may be 0 or 0.0 from a careless configuration,
|
||||
// double check to make sure.
|
||||
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
|
||||
pool = initializePool(size);
|
||||
LOG.info("Cleaner pool size is {}", size);
|
||||
size = size == 0 ?
|
||||
CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault) : size;
|
||||
pool = initializePool(size, name);
|
||||
LOG.info("{} Cleaner pool size is {}", name, size);
|
||||
cleanerLatch = 0;
|
||||
}
|
||||
|
||||
private static ThreadPoolExecutor initializePool(int size) {
|
||||
private static ThreadPoolExecutor initializePool(int size, String name) {
|
||||
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
|
||||
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
|
||||
new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true)
|
||||
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||
}
|
||||
|
||||
|
@ -64,9 +84,11 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
@Override
|
||||
public synchronized void onConfigurationChange(Configuration conf) {
|
||||
int newSize = CleanerChore.calculatePoolSize(
|
||||
conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE));
|
||||
conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
|
||||
dirScanPoolType.cleanerPoolSizeConfigDefault));
|
||||
if (newSize == size) {
|
||||
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
|
||||
LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
|
||||
name, newSize);
|
||||
return;
|
||||
}
|
||||
size = newSize;
|
||||
|
@ -109,11 +131,19 @@ public class DirScanPool implements ConfigurationObserver {
|
|||
break;
|
||||
}
|
||||
}
|
||||
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
|
||||
LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
|
||||
pool.setCorePoolSize(size);
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
|
||||
return new DirScanPool(conf, Type.HFILE_CLEANER);
|
||||
}
|
||||
|
||||
public static DirScanPool getLogCleanerScanPool(Configuration conf) {
|
||||
return new DirScanPool(conf, Type.LOG_CLEANER);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ public class TestHFileArchiving {
|
|||
// We don't want the cleaner to remove files. The tests do that.
|
||||
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);
|
||||
|
||||
POOL = new DirScanPool(UTIL.getConfiguration());
|
||||
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
|
|
|
@ -119,7 +119,7 @@ public class TestZooKeeperTableArchiveClient {
|
|||
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
|
||||
ZKUtil.createWithParents(watcher, archivingZNode);
|
||||
rss = mock(RegionServerServices.class);
|
||||
POOL = new DirScanPool(UTIL.getConfiguration());
|
||||
POOL= DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
|
|
|
@ -61,7 +61,7 @@ public class TestCleanerChore {
|
|||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
POOL = new DirScanPool(UTIL.getConfiguration());
|
||||
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -469,6 +469,57 @@ public class TestCleanerChore {
|
|||
t.join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnConfigurationChangeLogCleaner() throws Exception {
|
||||
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
|
||||
if (availableProcessorNum == 1) { // no need to run this test
|
||||
return;
|
||||
}
|
||||
|
||||
DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration());
|
||||
|
||||
// have at least 2 available processors/cores
|
||||
int initPoolSize = availableProcessorNum / 2;
|
||||
int changedPoolSize = availableProcessorNum;
|
||||
|
||||
Stoppable stop = new StoppableImplementation();
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
Path testDir = UTIL.getDataTestDir();
|
||||
FileSystem fs = UTIL.getTestFileSystem();
|
||||
String confKey = "hbase.test.cleaner.delegates";
|
||||
conf.set(confKey, AlwaysDelete.class.getName());
|
||||
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize));
|
||||
final AllValidPaths chore =
|
||||
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool);
|
||||
chore.setEnabled(true);
|
||||
// Create subdirs under testDir
|
||||
int dirNums = 6;
|
||||
Path[] subdirs = new Path[dirNums];
|
||||
for (int i = 0; i < dirNums; i++) {
|
||||
subdirs[i] = new Path(testDir, "subdir-" + i);
|
||||
fs.mkdirs(subdirs[i]);
|
||||
}
|
||||
// Under each subdirs create 6 files
|
||||
for (Path subdir : subdirs) {
|
||||
createFiles(fs, subdir, 6);
|
||||
}
|
||||
// Start chore
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
chore.chore();
|
||||
}
|
||||
});
|
||||
t.setDaemon(true);
|
||||
t.start();
|
||||
// Change size of chore's pool
|
||||
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize));
|
||||
pool.onConfigurationChange(conf);
|
||||
assertEquals(changedPoolSize, chore.getChorePoolSize());
|
||||
// Stop chore
|
||||
t.join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinimumNumberOfThreads() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
|
|
|
@ -71,7 +71,7 @@ public class TestHFileCleaner {
|
|||
public static void setupCluster() throws Exception {
|
||||
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
|
||||
UTIL.startMiniDFSCluster(1);
|
||||
POOL = new DirScanPool(UTIL.getConfiguration());
|
||||
POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestHFileLinkCleaner {
|
|||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
|
||||
POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -94,7 +94,7 @@ public class TestLogsCleaner {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
TEST_UTIL.startMiniDFSCluster(1);
|
||||
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
|
||||
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -48,7 +48,9 @@ public class MasterRegionTestBase {
|
|||
|
||||
protected ChoreService choreService;
|
||||
|
||||
protected DirScanPool cleanerPool;
|
||||
protected DirScanPool hfileCleanerPool;
|
||||
|
||||
protected DirScanPool logCleanerPool;
|
||||
|
||||
protected static byte[] CF1 = Bytes.toBytes("f1");
|
||||
|
||||
|
@ -80,7 +82,8 @@ public class MasterRegionTestBase {
|
|||
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
|
||||
configure(htu.getConfiguration());
|
||||
choreService = new ChoreService(getClass().getSimpleName());
|
||||
cleanerPool = new DirScanPool(htu.getConfiguration());
|
||||
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration());
|
||||
logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration());
|
||||
Server server = mock(Server.class);
|
||||
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
|
||||
when(server.getServerName())
|
||||
|
@ -103,7 +106,8 @@ public class MasterRegionTestBase {
|
|||
@After
|
||||
public void tearDown() throws IOException {
|
||||
region.close(true);
|
||||
cleanerPool.shutdownNow();
|
||||
hfileCleanerPool.shutdownNow();
|
||||
logCleanerPool.shutdownNow();
|
||||
choreService.shutdown();
|
||||
htu.cleanupTestDir();
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public class TestMasterRegionCompaction extends MasterRegionTestBase {
|
|||
public boolean isStopped() {
|
||||
return stopped;
|
||||
}
|
||||
}, conf, fs, globalArchivePath, cleanerPool);
|
||||
}, conf, fs, globalArchivePath, hfileCleanerPool);
|
||||
choreService.scheduleChore(hfileCleaner);
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
|
|||
public boolean isStopped() {
|
||||
return stopped;
|
||||
}
|
||||
}, conf, fs, globalWALArchiveDir, cleanerPool, null);
|
||||
}, conf, fs, globalWALArchiveDir, logCleanerPool, null);
|
||||
choreService.scheduleChore(logCleaner);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue