HBASE-20095 Redesign single instance pool in CleanerChore

This commit is contained in:
Reid Chan 2018-03-26 11:39:30 +08:00 committed by Mike Drob
parent 6a5c14b227
commit 83fa0ad9ed
7 changed files with 118 additions and 41 deletions

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
@ -1146,6 +1147,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor(); startProcedureExecutor();
// Initial cleaner chore
CleanerChore.initChorePool(conf);
// Start log cleaner thread // Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
this.logCleaner = this.logCleaner =

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask; import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -41,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
@ -51,7 +53,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
* @param <T> Cleaner delegate class that is dynamically loaded from configuration * @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/ */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
justification="TODO: Fix. It is wonky have static pool initialized from instance") justification="Static pool will be only updated once.")
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
implements ConfigurationObserver { implements ConfigurationObserver {
@ -68,19 +70,93 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
private static class DirScanPool {
int size;
ForkJoinPool pool;
int cleanerLatch;
AtomicBoolean reconfigNotification;
DirScanPool(Configuration conf) {
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
size = calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
pool = new ForkJoinPool(size);
LOG.info("Cleaner pool size is {}", size);
reconfigNotification = new AtomicBoolean(false);
cleanerLatch = 0;
}
/**
* Checks if pool can be updated immediately.
* @param conf configuration
* @return true if pool can be updated immediately, false otherwise
*/
synchronized boolean canUpdateImmediately(Configuration conf) {
int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
return false;
}
size = newSize;
if (pool.getPoolSize() == 0) {
// chore has no working thread.
return true;
}
// Chore is working, update it later.
reconfigNotification.set(true);
return false;
}
/**
* Update pool with new size.
*/
synchronized void updatePool(long timeout) {
while (cleanerLatch != 0) {
try {
wait(timeout);
} catch (InterruptedException ie) {
// It's ok to ignore
}
break;
}
pool.shutdownNow();
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
pool = new ForkJoinPool(size);
}
synchronized void latchCountUp() {
cleanerLatch++;
}
synchronized void latchCountDown() {
cleanerLatch--;
notifyAll();
}
@SuppressWarnings("FutureReturnValueIgnored")
synchronized void submit(ForkJoinTask task) {
pool.submit(task);
}
}
// It may be waste resources for each cleaner chore own its pool, // It may be waste resources for each cleaner chore own its pool,
// so let's make pool for all cleaner chores. // so let's make pool for all cleaner chores.
private static volatile ForkJoinPool CHOREPOOL; private static volatile DirScanPool POOL;
private static volatile int CHOREPOOLSIZE;
protected final FileSystem fs; protected final FileSystem fs;
private final Path oldFileDir; private final Path oldFileDir;
private final Configuration conf; private final Configuration conf;
protected final Map<String, Object> params; protected final Map<String, Object> params;
private final AtomicBoolean enabled = new AtomicBoolean(true); private final AtomicBoolean enabled = new AtomicBoolean(true);
private final AtomicBoolean reconfig = new AtomicBoolean(false);
protected List<T> cleanersChain; protected List<T> cleanersChain;
public static void initChorePool(Configuration conf) {
if (POOL == null) {
POOL = new DirScanPool(conf);
}
}
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) { FileSystem fs, Path oldFileDir, String confKey) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
@ -99,21 +175,14 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) { FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
super(name, s, sleepPeriod); super(name, s, sleepPeriod);
Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
+ "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
this.fs = fs; this.fs = fs;
this.oldFileDir = oldFileDir; this.oldFileDir = oldFileDir;
this.conf = conf; this.conf = conf;
this.params = params; this.params = params;
initCleanerChain(confKey); initCleanerChain(confKey);
if (CHOREPOOL == null) {
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
CHOREPOOLSIZE = calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE;
this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE);
LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE);
}
} }
/** /**
@ -174,25 +243,10 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@Override @Override
public void onConfigurationChange(Configuration conf) { public void onConfigurationChange(Configuration conf) {
int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); if (POOL.canUpdateImmediately(conf)) {
if (updatedSize == CHOREPOOLSIZE) { // Can immediately update, no need to wait.
LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize); POOL.updatePool(0);
return;
} }
CHOREPOOLSIZE = updatedSize;
if (CHOREPOOL.getPoolSize() == 0) {
// Chore does not work now, update it directly.
updateChorePoolSize(updatedSize);
return;
}
// Chore is working, update it after chore finished.
reconfig.set(true);
}
private void updateChorePoolSize(int updatedSize) {
CHOREPOOL.shutdownNow();
LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize);
CHOREPOOL = new ForkJoinPool(updatedSize);
} }
/** /**
@ -221,14 +275,22 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@Override @Override
protected void chore() { protected void chore() {
if (getEnabled()) { if (getEnabled()) {
if (runCleaner()) { try {
LOG.debug("Cleaned all WALs under {}", oldFileDir); POOL.latchCountUp();
} else { if (runCleaner()) {
LOG.warn("WALs outstanding under {}", oldFileDir); LOG.debug("Cleaned all WALs under {}", oldFileDir);
} else {
LOG.warn("WALs outstanding under {}", oldFileDir);
}
} finally {
POOL.latchCountDown();
} }
// After each clean chore, checks if receives reconfigure notification while cleaning // After each cleaner chore, checks if received reconfigure notification while cleaning.
if (reconfig.compareAndSet(true, false)) { // First in cleaner turns off notification, to avoid another cleaner updating pool again.
updateChorePoolSize(CHOREPOOLSIZE); if (POOL.reconfigNotification.compareAndSet(true, false)) {
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} }
} else { } else {
LOG.debug("Cleaner chore disabled! Not cleaning."); LOG.debug("Cleaner chore disabled! Not cleaning.");
@ -242,7 +304,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
public Boolean runCleaner() { public Boolean runCleaner() {
preRunCleaner(); preRunCleaner();
CleanerTask task = new CleanerTask(this.oldFileDir, true); CleanerTask task = new CleanerTask(this.oldFileDir, true);
CHOREPOOL.submit(task); POOL.submit(task);
return task.join(); return task.join();
} }
@ -374,7 +436,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@VisibleForTesting @VisibleForTesting
int getChorePoolSize() { int getChorePoolSize() {
return CHOREPOOLSIZE; return POOL.size;
} }
/** /**

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
@ -175,6 +176,7 @@ public class TestZooKeeperTableArchiveClient {
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
// setup the delegate // setup the delegate
Stoppable stop = new StoppableImplementation(); Stoppable stop = new StoppableImplementation();
CleanerChore.initChorePool(conf);
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
@ -229,6 +231,7 @@ public class TestZooKeeperTableArchiveClient {
// setup the delegate // setup the delegate
Stoppable stop = new StoppableImplementation(); Stoppable stop = new StoppableImplementation();
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
CleanerChore.initChorePool(conf);
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.StoppableImplementation; import org.apache.hadoop.hbase.util.StoppableImplementation;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -55,6 +56,11 @@ public class TestCleanerChore {
private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@Before
public void setup() throws Exception {
CleanerChore.initChorePool(UTIL.getConfiguration());
}
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
// delete and recreate the test directory, ensuring a clean test dir between tests // delete and recreate the test directory, ensuring a clean test dir between tests

View File

@ -68,6 +68,7 @@ public class TestHFileCleaner {
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
// have to use a minidfs cluster because the localfs doesn't modify file times correctly // have to use a minidfs cluster because the localfs doesn't modify file times correctly
UTIL.startMiniDFSCluster(1); UTIL.startMiniDFSCluster(1);
CleanerChore.initChorePool(UTIL.getConfiguration());
} }
@AfterClass @AfterClass

View File

@ -108,6 +108,7 @@ public class TestHFileLinkCleaner {
final long ttl = 1000; final long ttl = 1000;
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
Server server = new DummyServer(); Server server = new DummyServer();
CleanerChore.initChorePool(conf);
HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir);
// Link backref cannot be removed // Link backref cannot be removed

View File

@ -85,6 +85,7 @@ public class TestLogsCleaner {
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniDFSCluster(1); TEST_UTIL.startMiniDFSCluster(1);
CleanerChore.initChorePool(TEST_UTIL.getConfiguration());
} }
@AfterClass @AfterClass