diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0dc5aa3200b..f5bd0de0841 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; @@ -1146,6 +1147,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); startProcedureExecutor(); + // Initial cleaner chore + CleanerChore.initChorePool(conf); // Start log cleaner thread int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); this.logCleaner = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 46f6217a90c..312bcce7e3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -41,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; @@ -51,7 +53,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; * @param Cleaner delegate class that is dynamically loaded from configuration */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", - justification="TODO: Fix. It is wonky have static pool initialized from instance") + justification="Static pool will be only updated once.") @InterfaceAudience.Private public abstract class CleanerChore extends ScheduledChore implements ConfigurationObserver { @@ -68,19 +70,93 @@ public abstract class CleanerChore extends Schedu public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size"; private static final String DEFAULT_CHORE_POOL_SIZE = "0.25"; + private static class DirScanPool { + int size; + ForkJoinPool pool; + int cleanerLatch; + AtomicBoolean reconfigNotification; + + DirScanPool(Configuration conf) { + String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); + size = calculatePoolSize(poolSize); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size; + pool = new ForkJoinPool(size); + LOG.info("Cleaner pool size is {}", size); + reconfigNotification = new AtomicBoolean(false); + cleanerLatch = 0; + } + + /** + * Checks if pool can be updated immediately. + * @param conf configuration + * @return true if pool can be updated immediately, false otherwise + */ + synchronized boolean canUpdateImmediately(Configuration conf) { + int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); + if (newSize == size) { + LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); + return false; + } + size = newSize; + if (pool.getPoolSize() == 0) { + // chore has no working thread. + return true; + } + // Chore is working, update it later. + reconfigNotification.set(true); + return false; + } + + /** + * Update pool with new size. + */ + synchronized void updatePool(long timeout) { + while (cleanerLatch != 0) { + try { + wait(timeout); + } catch (InterruptedException ie) { + // It's ok to ignore + } + break; + } + pool.shutdownNow(); + LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); + pool = new ForkJoinPool(size); + } + + synchronized void latchCountUp() { + cleanerLatch++; + } + + synchronized void latchCountDown() { + cleanerLatch--; + notifyAll(); + } + + @SuppressWarnings("FutureReturnValueIgnored") + synchronized void submit(ForkJoinTask task) { + pool.submit(task); + } + } // It may be waste resources for each cleaner chore own its pool, // so let's make pool for all cleaner chores. - private static volatile ForkJoinPool CHOREPOOL; - private static volatile int CHOREPOOLSIZE; + private static volatile DirScanPool POOL; protected final FileSystem fs; private final Path oldFileDir; private final Configuration conf; protected final Map params; private final AtomicBoolean enabled = new AtomicBoolean(true); - private final AtomicBoolean reconfig = new AtomicBoolean(false); protected List cleanersChain; + public static void initChorePool(Configuration conf) { + if (POOL == null) { + POOL = new DirScanPool(conf); + } + } + public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); @@ -99,21 +175,14 @@ public abstract class CleanerChore extends Schedu public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey, Map params) { super(name, s, sleepPeriod); + + Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call" + + "CleanerChore.initChorePool(Configuration) before new a cleaner chore."); this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; this.params = params; initCleanerChain(confKey); - - if (CHOREPOOL == null) { - String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); - CHOREPOOLSIZE = calculatePoolSize(poolSize); - // poolSize may be 0 or 0.0 from a careless configuration, - // double check to make sure. - CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE; - this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE); - LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE); - } } /** @@ -174,25 +243,10 @@ public abstract class CleanerChore extends Schedu @Override public void onConfigurationChange(Configuration conf) { - int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); - if (updatedSize == CHOREPOOLSIZE) { - LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize); - return; + if (POOL.canUpdateImmediately(conf)) { + // Can immediately update, no need to wait. + POOL.updatePool(0); } - CHOREPOOLSIZE = updatedSize; - if (CHOREPOOL.getPoolSize() == 0) { - // Chore does not work now, update it directly. - updateChorePoolSize(updatedSize); - return; - } - // Chore is working, update it after chore finished. - reconfig.set(true); - } - - private void updateChorePoolSize(int updatedSize) { - CHOREPOOL.shutdownNow(); - LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize); - CHOREPOOL = new ForkJoinPool(updatedSize); } /** @@ -221,14 +275,22 @@ public abstract class CleanerChore extends Schedu @Override protected void chore() { if (getEnabled()) { - if (runCleaner()) { - LOG.debug("Cleaned all WALs under {}", oldFileDir); - } else { - LOG.warn("WALs outstanding under {}", oldFileDir); + try { + POOL.latchCountUp(); + if (runCleaner()) { + LOG.debug("Cleaned all WALs under {}", oldFileDir); + } else { + LOG.warn("WALs outstanding under {}", oldFileDir); + } + } finally { + POOL.latchCountDown(); } - // After each clean chore, checks if receives reconfigure notification while cleaning - if (reconfig.compareAndSet(true, false)) { - updateChorePoolSize(CHOREPOOLSIZE); + // After each cleaner chore, checks if received reconfigure notification while cleaning. + // First in cleaner turns off notification, to avoid another cleaner updating pool again. + if (POOL.reconfigNotification.compareAndSet(true, false)) { + // This cleaner is waiting for other cleaners finishing their jobs. + // To avoid missing next chore, only wait 0.8 * period, then shutdown. + POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod()))); } } else { LOG.debug("Cleaner chore disabled! Not cleaning."); @@ -242,7 +304,7 @@ public abstract class CleanerChore extends Schedu public Boolean runCleaner() { preRunCleaner(); CleanerTask task = new CleanerTask(this.oldFileDir, true); - CHOREPOOL.submit(task); + POOL.submit(task); return task.join(); } @@ -374,7 +436,7 @@ public abstract class CleanerChore extends Schedu @VisibleForTesting int getChorePoolSize() { - return CHOREPOOLSIZE; + return POOL.size; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index f3e193e1f48..16f3930e3d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -175,6 +176,7 @@ public class TestZooKeeperTableArchiveClient { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -229,6 +231,7 @@ public class TestZooKeeperTableArchiveClient { // setup the delegate Stoppable stop = new StoppableImplementation(); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 22fa292a216..9438319187f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,6 +56,11 @@ public class TestCleanerChore { private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + @Before + public void setup() throws Exception { + CleanerChore.initChorePool(UTIL.getConfiguration()); + } + @After public void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 32480ea1bb0..465e1932a95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -68,6 +68,7 @@ public class TestHFileCleaner { public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); + CleanerChore.initChorePool(UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index 667a33e893b..c011ea8da70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -108,6 +108,7 @@ public class TestHFileLinkCleaner { final long ttl = 1000; conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); // Link backref cannot be removed diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 7423d269337..0263085aecf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -85,6 +85,7 @@ public class TestLogsCleaner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); + CleanerChore.initChorePool(TEST_UTIL.getConfiguration()); } @AfterClass