HBASE-26320 Implement a separate thread pool for the LogCleaner (#3712) (#4460)

This avoids starvation when the archive directory is large and takes a long time
to iterate through.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
Signed-off-by: Pankaj <pankajkumar@apache.org>
This commit is contained in:
Xiaolin Ha 2022-05-24 14:26:09 +08:00 committed by GitHub
parent 4260e3afb7
commit 200cbcc34f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 137 additions and 35 deletions

View File

@ -359,7 +359,10 @@ public class HMaster extends HRegionServer implements MasterServices {
private HbckChore hbckChore; private HbckChore hbckChore;
CatalogJanitor catalogJanitorChore; CatalogJanitor catalogJanitorChore;
private DirScanPool cleanerPool; // Threadpool for scanning the archive directory, used by the HFileCleaner
private DirScanPool hfileCleanerPool;
// Threadpool for scanning the Old logs directory, used by the LogCleaner
private DirScanPool logCleanerPool;
private LogCleaner logCleaner; private LogCleaner logCleaner;
private HFileCleaner hfileCleaner; private HFileCleaner hfileCleaner;
private ReplicationBarrierCleaner replicationBarrierCleaner; private ReplicationBarrierCleaner replicationBarrierCleaner;
@ -1154,7 +1157,8 @@ public class HMaster extends HRegionServer implements MasterServices {
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f)); (EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime(); this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.cleanerPool); configurationManager.registerObserver(this.hfileCleanerPool);
configurationManager.registerObserver(this.logCleanerPool);
configurationManager.registerObserver(this.hfileCleaner); configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner); configurationManager.registerObserver(this.logCleaner);
configurationManager.registerObserver(this.regionsRecoveryConfigManager); configurationManager.registerObserver(this.regionsRecoveryConfigManager);
@ -1518,8 +1522,8 @@ public class HMaster extends HRegionServer implements MasterServices {
.setExecutorType(ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1)); .setExecutorType(ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
startProcedureExecutor(); startProcedureExecutor();
// Create cleaner thread pool // Create log cleaner thread pool
cleanerPool = new DirScanPool(conf); logCleanerPool = DirScanPool.getLogCleanerScanPool(conf);
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put(MASTER, this); params.put(MASTER, this);
// Start log cleaner thread // Start log cleaner thread
@ -1527,13 +1531,15 @@ public class HMaster extends HRegionServer implements MasterServices {
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL); conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = this.logCleaner =
new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(), new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(),
getMasterWalManager().getOldLogDir(), cleanerPool, params); getMasterWalManager().getOldLogDir(), logCleanerPool, params);
getChoreService().scheduleChore(logCleaner); getChoreService().scheduleChore(logCleaner);
// start the hfile archive cleaner thread // start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
// Create archive cleaner thread pool
hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params); getMasterFileSystem().getFileSystem(), archiveDir, hfileCleanerPool, params);
getChoreService().scheduleChore(hfileCleaner); getChoreService().scheduleChore(hfileCleaner);
// Regions Reopen based on very high storeFileRefCount is considered enabled // Regions Reopen based on very high storeFileRefCount is considered enabled
@ -1586,9 +1592,13 @@ public class HMaster extends HRegionServer implements MasterServices {
this.mobCompactThread.close(); this.mobCompactThread.close();
} }
super.stopServiceThreads(); super.stopServiceThreads();
if (cleanerPool != null) { if (hfileCleanerPool != null) {
cleanerPool.shutdownNow(); hfileCleanerPool.shutdownNow();
cleanerPool = null; hfileCleanerPool = null;
}
if (logCleanerPool != null) {
logCleanerPool.shutdownNow();
logCleanerPool = null;
} }
LOG.debug("Stopping service threads"); LOG.debug("Stopping service threads");

View File

@ -57,12 +57,19 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors(); private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
/** /**
* If it is an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be * Configures the threadpool used for scanning the archive directory for the HFileCleaner If it is
* available processors * size. Pay attention that 1.0 is different from 1, former indicates it * an integer and >= 1, it would be the size; if 0.0 < size <= 1.0, size would be available
* will use 100% of cores, while latter will use only 1 thread for chore to scan dir. * processors * size. Pay attention that 1.0 is different from 1, former indicates it will use
* 100% of cores, while latter will use only 1 thread for chore to scan dir.
*/ */
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
/**
* Configures the threadpool used for scanning the Old logs directory for the LogCleaner Follows
* the same configuration mechanism as CHORE_POOL_SIZE, but has a default of 1 thread.
*/
public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";
private final DirScanPool pool; private final DirScanPool pool;

View File

@ -39,21 +39,42 @@ public class DirScanPool implements ConfigurationObserver {
private final ThreadPoolExecutor pool; private final ThreadPoolExecutor pool;
private int cleanerLatch; private int cleanerLatch;
private boolean reconfigNotification; private boolean reconfigNotification;
private Type dirScanPoolType;
private final String name;
public DirScanPool(Configuration conf) { private enum Type {
String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE); LOG_CLEANER(CleanerChore.LOG_CLEANER_CHORE_SIZE,
CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE),
HFILE_CLEANER(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
private final String cleanerPoolSizeConfigName;
private final String cleanerPoolSizeConfigDefault;
private Type(String cleanerPoolSizeConfigName, String cleanerPoolSizeConfigDefault) {
this.cleanerPoolSizeConfigName = cleanerPoolSizeConfigName;
this.cleanerPoolSizeConfigDefault = cleanerPoolSizeConfigDefault;
}
}
private DirScanPool(Configuration conf, Type dirScanPoolType) {
this.dirScanPoolType = dirScanPoolType;
this.name = dirScanPoolType.name().toLowerCase();
String poolSize = conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
dirScanPoolType.cleanerPoolSizeConfigDefault);
size = CleanerChore.calculatePoolSize(poolSize); size = CleanerChore.calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration, // poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure. // double check to make sure.
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size; size = size == 0
pool = initializePool(size); ? CleanerChore.calculatePoolSize(dirScanPoolType.cleanerPoolSizeConfigDefault)
LOG.info("Cleaner pool size is {}", size); : size;
pool = initializePool(size, name);
LOG.info("{} Cleaner pool size is {}", name, size);
cleanerLatch = 0; cleanerLatch = 0;
} }
private static ThreadPoolExecutor initializePool(int size) { private static ThreadPoolExecutor initializePool(int size, String name) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true) new ThreadFactoryBuilder().setNameFormat(name + "-dir-scan-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }
@ -63,10 +84,11 @@ public class DirScanPool implements ConfigurationObserver {
*/ */
@Override @Override
public synchronized void onConfigurationChange(Configuration conf) { public synchronized void onConfigurationChange(Configuration conf) {
int newSize = CleanerChore.calculatePoolSize( int newSize = CleanerChore.calculatePoolSize(conf.get(dirScanPoolType.cleanerPoolSizeConfigName,
conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE)); dirScanPoolType.cleanerPoolSizeConfigDefault));
if (newSize == size) { if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
name, newSize);
return; return;
} }
size = newSize; size = newSize;
@ -109,11 +131,19 @@ public class DirScanPool implements ConfigurationObserver {
break; break;
} }
} }
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size); LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
pool.setCorePoolSize(size); pool.setCorePoolSize(size);
} }
public int getSize() { public int getSize() {
return size; return size;
} }
public static DirScanPool getHFileCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.HFILE_CLEANER);
}
public static DirScanPool getLogCleanerScanPool(Configuration conf) {
return new DirScanPool(conf, Type.LOG_CLEANER);
}
} }

View File

@ -109,7 +109,7 @@ public class TestHFileArchiving {
// We don't want the cleaner to remove files. The tests do that. // We don't want the cleaner to remove files. The tests do that.
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);
POOL = new DirScanPool(UTIL.getConfiguration()); POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
} }
private static void setupConf(Configuration conf) { private static void setupConf(Configuration conf) {

View File

@ -104,7 +104,7 @@ public class TestZooKeeperTableArchiveClient {
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode); ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class); rss = mock(RegionServerServices.class);
POOL = new DirScanPool(UTIL.getConfiguration()); POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
} }
private static void setupConf(Configuration conf) { private static void setupConf(Configuration conf) {

View File

@ -62,7 +62,7 @@ public class TestCleanerChore {
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
POOL = new DirScanPool(UTIL.getConfiguration()); POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
} }
@AfterClass @AfterClass
@ -470,6 +470,57 @@ public class TestCleanerChore {
t.join(); t.join();
} }
@Test
public void testOnConfigurationChangeLogCleaner() throws Exception {
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
if (availableProcessorNum == 1) { // no need to run this test
return;
}
DirScanPool pool = DirScanPool.getLogCleanerScanPool(UTIL.getConfiguration());
// have at least 2 available processors/cores
int initPoolSize = availableProcessorNum / 2;
int changedPoolSize = availableProcessorNum;
Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
String confKey = "hbase.test.cleaner.delegates";
conf.set(confKey, AlwaysDelete.class.getName());
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize));
final AllValidPaths chore =
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool);
chore.setEnabled(true);
// Create subdirs under testDir
int dirNums = 6;
Path[] subdirs = new Path[dirNums];
for (int i = 0; i < dirNums; i++) {
subdirs[i] = new Path(testDir, "subdir-" + i);
fs.mkdirs(subdirs[i]);
}
// Under each subdirs create 6 files
for (Path subdir : subdirs) {
createFiles(fs, subdir, 6);
}
// Start chore
Thread t = new Thread(new Runnable() {
@Override
public void run() {
chore.chore();
}
});
t.setDaemon(true);
t.start();
// Change size of chore's pool
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize));
pool.onConfigurationChange(conf);
assertEquals(changedPoolSize, chore.getChorePoolSize());
// Stop chore
t.join();
}
@Test @Test
public void testMinimumNumberOfThreads() throws Exception { public void testMinimumNumberOfThreads() throws Exception {
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();

View File

@ -75,7 +75,7 @@ public class TestHFileCleaner {
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
// have to use a minidfs cluster because the localfs doesn't modify file times correctly // have to use a minidfs cluster because the localfs doesn't modify file times correctly
UTIL.startMiniDFSCluster(1); UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(UTIL.getConfiguration()); POOL = DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration());
} }
@AfterClass @AfterClass

View File

@ -70,7 +70,7 @@ public class TestHFileLinkCleaner {
@BeforeClass @BeforeClass
public static void setUp() { public static void setUp() {
POOL = new DirScanPool(TEST_UTIL.getConfiguration()); POOL = DirScanPool.getHFileCleanerScanPool(TEST_UTIL.getConfiguration());
} }
@AfterClass @AfterClass

View File

@ -91,7 +91,7 @@ public class TestLogsCleaner {
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1); TEST_UTIL.startMiniDFSCluster(1);
POOL = new DirScanPool(TEST_UTIL.getConfiguration()); POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
} }
@AfterClass @AfterClass

View File

@ -49,7 +49,9 @@ public class MasterRegionTestBase {
protected ChoreService choreService; protected ChoreService choreService;
protected DirScanPool cleanerPool; protected DirScanPool hfileCleanerPool;
protected DirScanPool logCleanerPool;
protected static byte[] CF1 = Bytes.toBytes("f1"); protected static byte[] CF1 = Bytes.toBytes("f1");
@ -90,7 +92,8 @@ public class MasterRegionTestBase {
Configuration conf = htu.getConfiguration(); Configuration conf = htu.getConfiguration();
configure(conf); configure(conf);
choreService = new ChoreService(getClass().getSimpleName()); choreService = new ChoreService(getClass().getSimpleName());
cleanerPool = new DirScanPool(htu.getConfiguration()); hfileCleanerPool = DirScanPool.getHFileCleanerScanPool(htu.getConfiguration());
logCleanerPool = DirScanPool.getLogCleanerScanPool(htu.getConfiguration());
Server server = mock(Server.class); Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf); when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName()) when(server.getServerName())
@ -117,7 +120,8 @@ public class MasterRegionTestBase {
@After @After
public void tearDown() throws IOException { public void tearDown() throws IOException {
region.close(true); region.close(true);
cleanerPool.shutdownNow(); hfileCleanerPool.shutdownNow();
logCleanerPool.shutdownNow();
choreService.shutdown(); choreService.shutdown();
htu.cleanupTestDir(); htu.cleanupTestDir();
} }

View File

@ -77,7 +77,7 @@ public class TestMasterRegionCompaction extends MasterRegionTestBase {
public boolean isStopped() { public boolean isStopped() {
return stopped; return stopped;
} }
}, conf, fs, globalArchivePath, cleanerPool); }, conf, fs, globalArchivePath, hfileCleanerPool);
choreService.scheduleChore(hfileCleaner); choreService.scheduleChore(hfileCleaner);
} }

View File

@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
public boolean isStopped() { public boolean isStopped() {
return stopped; return stopped;
} }
}, conf, fs, globalWALArchiveDir, cleanerPool, null); }, conf, fs, globalWALArchiveDir, logCleanerPool, null);
choreService.scheduleChore(logCleaner); choreService.scheduleChore(logCleaner);
} }

View File

@ -234,7 +234,7 @@ public class TestSnapshotManager {
// Initialize cleaner // Initialize cleaner
HFileCleaner cleaner = new HFileCleaner(10000, Mockito.mock(Stoppable.class), conf, fs, HFileCleaner cleaner = new HFileCleaner(10000, Mockito.mock(Stoppable.class), conf, fs,
archiveDir, new DirScanPool(UTIL.getConfiguration())); archiveDir, DirScanPool.getHFileCleanerScanPool(UTIL.getConfiguration()));
// Link backref and HFile cannot be removed // Link backref and HFile cannot be removed
cleaner.choreForTesting(); cleaner.choreForTesting();
assertTrue(fs.exists(linkBackRef)); assertTrue(fs.exists(linkBackRef));