From 200cbcc34f654c9880ad7612e46f6b2f07f55fb6 Mon Sep 17 00:00:00 2001 From: Xiaolin Ha Date: Tue, 24 May 2022 14:26:09 +0800 Subject: [PATCH] HBASE-26320 Implement a separate thread pool for the LogCleaner (#3712) (#4460) This avoids starvation when the archive directory is large and takes a long time to iterate through. Signed-off-by: Duo Zhang Signed-off-by: Anoop Sam John Signed-off-by: Pankaj --- .../apache/hadoop/hbase/master/HMaster.java | 28 ++++++---- .../hbase/master/cleaner/CleanerChore.java | 13 +++-- .../hbase/master/cleaner/DirScanPool.java | 52 ++++++++++++++---- .../hbase/backup/TestHFileArchiving.java | 2 +- .../TestZooKeeperTableArchiveClient.java | 2 +- .../master/cleaner/TestCleanerChore.java | 53 ++++++++++++++++++- .../master/cleaner/TestHFileCleaner.java | 2 +- .../master/cleaner/TestHFileLinkCleaner.java | 2 +- .../hbase/master/cleaner/TestLogsCleaner.java | 2 +- .../master/region/MasterRegionTestBase.java | 10 ++-- .../region/TestMasterRegionCompaction.java | 2 +- .../region/TestMasterRegionWALCleaner.java | 2 +- .../master/snapshot/TestSnapshotManager.java | 2 +- 13 files changed, 137 insertions(+), 35 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 49c1369ee05..021625a29f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -359,7 +359,10 @@ public class HMaster extends HRegionServer implements MasterServices { private HbckChore hbckChore; CatalogJanitor catalogJanitorChore; - private DirScanPool cleanerPool; + // Threadpool for scanning the archive directory, used by the HFileCleaner + private DirScanPool hfileCleanerPool; + // Threadpool for scanning the Old logs directory, used by the LogCleaner + private DirScanPool logCleanerPool; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ReplicationBarrierCleaner replicationBarrierCleaner; @@ -1154,7 +1157,8 @@ public class HMaster extends HRegionServer implements MasterServices { (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f)); this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime(); configurationManager.registerObserver(this.balancer); - configurationManager.registerObserver(this.cleanerPool); + configurationManager.registerObserver(this.hfileCleanerPool); + configurationManager.registerObserver(this.logCleanerPool); configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.logCleaner); configurationManager.registerObserver(this.regionsRecoveryConfigManager); @@ -1518,8 +1522,8 @@ public class HMaster extends HRegionServer implements MasterServices { .setExecutorType(ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1)); startProcedureExecutor(); - // Create cleaner thread pool - cleanerPool = new DirScanPool(conf); + // Create log cleaner thread pool + logCleanerPool = DirScanPool.getLogCleanerScanPool(conf); Map params = new HashMap<>(); params.put(MASTER, this); // Start log cleaner thread @@ -1527,13 +1531,15 @@ public class HMaster extends HRegionServer implements MasterServices { conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(), - getMasterWalManager().getOldLogDir(), cleanerPool, params); + getMasterWalManager().getOldLogDir(), logCleanerPool, params); getChoreService().scheduleChore(logCleaner); // start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); + // Create archive cleaner thread pool + hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, - getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); + getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params); getChoreService().scheduleChore(hfileCleaner); // Regions Reopen based on very high storeFileRefCount is considered enabled @@ -1586,9 +1592,13 @@ public class HMaster extends HRegionServer implements MasterServices { this.mobCompactThread.close(); } super.stopServiceThreads(); - if (cleanerPool != null) { - cleanerPool.shutdownNow(); - cleanerPool = null; + if (hfileCleanerPool != null) { + hfileCleanerPool.shutdownNow(); + hfileCleanerPool = null; + } + if (logCleanerPool != null) { + logCleanerPool.shutdownNow(); + logCleanerPool = null; } LOG.debug("Stopping service threads"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index f74f881d1f5..80908e1e050 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -57,12 +57,19 @@ public abstract class CleanerChore extends Schedu private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); /** - * If it is an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be - * available processors * size. Pay attention that 1.0 is different from 1, former indicates it - * will use 100% of cores, while latter will use only 1 thread for chore to scan dir. + * Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is + * an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available + * processors * size. Pay attention that 1.0 is different from 1, former indicates it will use + * 100% of cores, while latter will use only 1 thread for chore to scan dir. */ public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; + /** + * Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows + * the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread. + */ + public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size"; + static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1"; private final DirScanPool pool; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java index 6989b288196..d5ef4e3fd36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java @@ -39,21 +39,42 @@ public class DirScanPool implements ConfigurationObserver { private final ThreadPoolExecutor pool; private int cleanerLatch; private boolean reconfigNotification; + private Type dirScanPoolType; + private final String name; - public DirScanPool(Configuration conf) { - String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); + private enum Type { + LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE, + CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE), + HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); + + private final String cleanerPoolSizeConfigName; + private final String cleanerPoolSizeConfigDefault; + + private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) { + this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName; + this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault; + } + } + + private DirScanPool(Configuration conf, Type dirScanPoolType) { + this.dirScanPoolType = dirScanPoolType; + this.name = dirScanPoolType.name().toLowerCase(); + String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName, + dirScanPoolType.cleanerPoolSizeConfigDefault); size = CleanerChore.calculatePoolSize(poolSize); // poolSize may be 0 or 0.0 from a careless configuration, // double check to make sure. - size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; - pool = initializePool(size); - LOG.info("Cleaner pool size is {}", size); + size = size == 0 + ? CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault) + : size; + pool = initializePool(size, name); + LOG.info("{} Cleaner pool size is {}", name, size); cleanerLatch = 0; } - private static ThreadPoolExecutor initializePool(int size) { + private static ThreadPoolExecutor initializePool(int size, String name) { return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, - new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true) + new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true) .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); } @@ -63,10 +84,11 @@ public class DirScanPool implements ConfigurationObserver { */ @Override public synchronized void onConfigurationChange(Configuration conf) { - int newSize = CleanerChore.calculatePoolSize( - conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); + int newSize = CleanerChore.calculatePoolSize(conf.get(dirScanPoolType.cleanerPoolSizeConfigName, + dirScanPoolType.cleanerPoolSizeConfigDefault)); if (newSize == size) { - LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); + LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.", + name, newSize); return; } size = newSize; @@ -109,11 +131,19 @@ public class DirScanPool implements ConfigurationObserver { break; } } - LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size); + LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size); pool.setCorePoolSize(size); } public int getSize() { return size; } + + public static DirScanPool getHFileCleanerScanPool(Configuration conf) { + return new DirScanPool(conf, Type.HFILE_CLEANER); + } + + public static DirScanPool getLogCleanerScanPool(Configuration conf) { + return new DirScanPool(conf, Type.LOG_CLEANER); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index 937268d797d..782a9a03c05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -109,7 +109,7 @@ public class TestHFileArchiving { // We don't want the cleaner to remove files. The tests do that. UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index cd6f6f368eb..ab1c41c1f08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -104,7 +104,7 @@ public class TestZooKeeperTableArchiveClient { String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); ZKUtil.createWithParents(watcher, archivingZNode); rss = mock(RegionServerServices.class); - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } private static void setupConf(Configuration conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 150bf66be4c..10c381615c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -62,7 +62,7 @@ public class TestCleanerChore { @BeforeClass public static void setup() { - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } @AfterClass @@ -470,6 +470,57 @@ public class TestCleanerChore { t.join(); } + @Test + public void testOnConfigurationChangeLogCleaner() throws Exception { + int availableProcessorNum = Runtime.getRuntime().availableProcessors(); + if (availableProcessorNum == 1) { // no need to run this test + return; + } + + DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration()); + + // have at least 2 available processors/cores + int initPoolSize = availableProcessorNum / 2; + int changedPoolSize = availableProcessorNum; + + Stoppable stop = new StoppableImplementation(); + Configuration conf = UTIL.getConfiguration(); + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = UTIL.getTestFileSystem(); + String confKey = "hbase.test.cleaner.delegates"; + conf.set(confKey, AlwaysDelete.class.getName()); + conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize)); + final AllValidPaths chore = + new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool); + chore.setEnabled(true); + // Create subdirs under testDir + int dirNums = 6; + Path[] subdirs = new Path[dirNums]; + for (int i = 0; i < dirNums; i++) { + subdirs[i] = new Path(testDir, "subdir-" + i); + fs.mkdirs(subdirs[i]); + } + // Under each subdirs create 6 files + for (Path subdir : subdirs) { + createFiles(fs, subdir, 6); + } + // Start chore + Thread t = new Thread(new Runnable() { + @Override + public void run() { + chore.chore(); + } + }); + t.setDaemon(true); + t.start(); + // Change size of chore's pool + conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize)); + pool.onConfigurationChange(conf); + assertEquals(changedPoolSize, chore.getChorePoolSize()); + // Stop chore + t.join(); + } + @Test public void testMinimumNumberOfThreads() throws Exception { Configuration conf = UTIL.getConfiguration(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index a089bd5ede2..68ad6b1d0b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -75,7 +75,7 @@ public class TestHFileCleaner { public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); - POOL = new DirScanPool(UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index b826b065d47..87d34dca868 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -70,7 +70,7 @@ public class TestHFileLinkCleaner { @BeforeClass public static void setUp() { - POOL = new DirScanPool(TEST_UTIL.getConfiguration()); + POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index a145a239f81..8df1b815415 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -91,7 +91,7 @@ public class TestLogsCleaner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); - POOL = new DirScanPool(TEST_UTIL.getConfiguration()); + POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java index 0ec0a27b9c5..11da1b03c11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java @@ -49,7 +49,9 @@ public class MasterRegionTestBase { protected ChoreService choreService; - protected DirScanPool cleanerPool; + protected DirScanPool hfileCleanerPool; + + protected DirScanPool logCleanerPool; protected static byte[] CF1 = Bytes.toBytes("f1"); @@ -90,7 +92,8 @@ public class MasterRegionTestBase { Configuration conf = htu.getConfiguration(); configure(conf); choreService = new ChoreService(getClass().getSimpleName()); - cleanerPool = new DirScanPool(htu.getConfiguration()); + hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration()); + logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration()); Server server = mock(Server.class); when(server.getConfiguration()).thenReturn(conf); when(server.getServerName()) @@ -117,7 +120,8 @@ public class MasterRegionTestBase { @After public void tearDown() throws IOException { region.close(true); - cleanerPool.shutdownNow(); + hfileCleanerPool.shutdownNow(); + logCleanerPool.shutdownNow(); choreService.shutdown(); htu.cleanupTestDir(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java index 18af54ffad7..4eefddff16b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionCompaction.java @@ -77,7 +77,7 @@ public class TestMasterRegionCompaction extends MasterRegionTestBase { public boolean isStopped() { return stopped; } - }, conf, fs, globalArchivePath, cleanerPool); + }, conf, fs, globalArchivePath, hfileCleanerPool); choreService.scheduleChore(hfileCleaner); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java index b19a9d626bf..6a596a6f138 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java @@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase { public boolean isStopped() { return stopped; } - }, conf, fs, globalWALArchiveDir, cleanerPool, null); + }, conf, fs, globalWALArchiveDir, logCleanerPool, null); choreService.scheduleChore(logCleaner); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java index af6a5b24f7c..feae6a87deb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java @@ -234,7 +234,7 @@ public class TestSnapshotManager { // Initialize cleaner HFileCleaner cleaner = new HFileCleaner(10000, Mockito.mock(Stoppable.class), conf, fs, - archiveDir, new DirScanPool(UTIL.getConfiguration())); + archiveDir, DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration())); // Link backref and HFile cannot be removed cleaner.choreForTesting(); assertTrue(fs.exists(linkBackRef));