diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 140009b65f0..13f048e8766 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -671,7 +671,7 @@ public class CacheConfig { // Bucket cache logs its stats on creation internal to the constructor. bucketCache = new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize, bucketSizes, writerThreads, writerQueueLen, persistentPath, - ioErrorsTolerationDuration); + ioErrorsTolerationDuration, c); } catch (IOException ioex) { LOG.error("Can't instantiate bucket cache", ioex); throw new RuntimeException(ioex); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 10843997184..79b1f4d7cb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -52,8 +52,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -100,14 +103,23 @@ import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFa public class BucketCache implements BlockCache, HeapSize { private static final Log LOG = LogFactory.getLog(BucketCache.class); - /** Priority buckets */ - private static final float DEFAULT_SINGLE_FACTOR = 0.25f; - private static final float DEFAULT_MULTI_FACTOR = 0.50f; - private static final float DEFAULT_MEMORY_FACTOR = 0.25f; - private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; + /** Priority buckets config */ + static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; + static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; + static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; + static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; + static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; + static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; + /** Priority buckets */ + @VisibleForTesting + static final float DEFAULT_SINGLE_FACTOR = 0.25f; + static final float DEFAULT_MULTI_FACTOR = 0.50f; + static final float DEFAULT_MEMORY_FACTOR = 0.25f; + static final float DEFAULT_MIN_FACTOR = 0.85f; + + private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; - private static final float DEFAULT_MIN_FACTOR = 0.85f; // Number of blocks to clear for each of the bucket size that is full private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; @@ -217,15 +229,34 @@ public class BucketCache implements BlockCache, HeapSize { // Allocate or free space for the block private BucketAllocator bucketAllocator; + /** Acceptable size of cache (no evictions if size < acceptable) */ + private float acceptableFactor; + + /** Minimum threshold of cache (when evicting, evict until size < min) */ + private float minFactor; + + /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ + private float extraFreeFactor; + + /** Single access bucket size */ + private float singleFactor; + + /** Multiple access bucket size */ + private float multiFactor; + + /** In-memory bucket size */ + private float memoryFactor; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, - persistencePath, DEFAULT_ERROR_TOLERATION_DURATION); + persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); } public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, - int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) + int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, + Configuration conf) throws FileNotFoundException, IOException { this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); this.writerThreads = new WriterThread[writerThreadNum]; @@ -235,6 +266,19 @@ public class BucketCache implements BlockCache, HeapSize { throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); } + this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); + this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); + this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); + this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); + this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); + this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); + + sanityCheckConfigs(); + + LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + + ", memoryFactor: " + memoryFactor); + this.cacheCapacity = capacity; this.persistencePath = persistencePath; this.blockSize = blockSize; @@ -281,6 +325,18 @@ public class BucketCache implements BlockCache, HeapSize { persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); } + private void sanityCheckConfigs() { + Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); + Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); + Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); + Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); + Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); + Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); + Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); + Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + + MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); + } + /** * Called by the constructor to start the writer threads. Used by tests that need to override * starting the threads. @@ -623,26 +679,16 @@ public class BucketCache implements BlockCache, HeapSize { } private long acceptableSize() { - return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR); + return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); } - private long singleSize() { - return (long) Math.floor(bucketAllocator.getTotalSize() - * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR); - } - - private long multiSize() { - return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR - * DEFAULT_MIN_FACTOR); - } - - private long memorySize() { - return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR - * DEFAULT_MIN_FACTOR); + @VisibleForTesting + long getPartitionSize(float partitionFactor) { + return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); } /** - * Return the count of bucketSizeinfos still needf ree space + * Return the count of bucketSizeinfos still need free space */ private int bucketSizesAboveThresholdCount(float minFactor) { BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); @@ -708,7 +754,7 @@ public class BucketCache implements BlockCache, HeapSize { long[] bytesToFreeForBucket = new long[stats.length]; for (int i = 0; i < stats.length; i++) { bytesToFreeForBucket[i] = 0; - long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR)); + long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); freeGoal = Math.max(freeGoal, 1); if (stats[i].freeCount() < freeGoal) { bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); @@ -735,15 +781,15 @@ public class BucketCache implements BlockCache, HeapSize { } long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra - * (1 + DEFAULT_EXTRA_FREE_FACTOR)); + * (1 + extraFreeFactor)); // Instantiate priority buckets BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, - blockSize, singleSize()); + blockSize, getPartitionSize(singleFactor)); BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, - blockSize, multiSize()); + blockSize, getPartitionSize(multiFactor)); BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, - blockSize, memorySize()); + blockSize, getPartitionSize(memoryFactor)); // Scan entire map putting bucket entry into appropriate bucket entry // group @@ -785,7 +831,7 @@ public class BucketCache implements BlockCache, HeapSize { } // Check and free if there are buckets that still need freeing of space - if (bucketSizesAboveThresholdCount(DEFAULT_MIN_FACTOR) > 0) { + if (bucketSizesAboveThresholdCount(minFactor) > 0) { bucketQueue.clear(); remainingBuckets = 3; @@ -1532,4 +1578,28 @@ public class BucketCache implements BlockCache, HeapSize { } return 0; } + + float getAcceptableFactor() { + return acceptableFactor; + } + + float getMinFactor() { + return minFactor; + } + + float getExtraFreeFactor() { + return extraFreeFactor; + } + + float getSingleFactor() { + return singleFactor; + } + + float getMultiFactor() { + return multiFactor; + } + + float getMemoryFactor() { + return memoryFactor; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index b0db13a49db..8cd665e3bf3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -19,17 +19,25 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; @@ -272,8 +280,112 @@ public class TestBucketCache { @Test public void testBucketAllocatorLargeBuckets() throws BucketAllocatorException { long availableSpace = 20 * 1024L * 1024 * 1024; - int[] bucketSizes = new int[] { 1024, 1024 * 1024, 1024 * 1024 * 1024 }; + int[] bucketSizes = new int[]{1024, 1024 * 1024, 1024 * 1024 * 1024}; BucketAllocator allocator = new BucketAllocator(availableSpace, bucketSizes); assertTrue(allocator.getBuckets().length > 0); } + + @Test + public void testGetPartitionSize() throws IOException { + //Test default values + validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR, BucketCache.DEFAULT_MIN_FACTOR); + + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); + conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); + conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); + + BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); + + validateGetPartitionSize(cache, 0.1f, 0.5f); + validateGetPartitionSize(cache, 0.7f, 0.5f); + validateGetPartitionSize(cache, 0.2f, 0.5f); + } + + @Test + public void testValidBucketCacheConfigs() throws IOException { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f); + conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f); + conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f); + conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f); + conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f); + + BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); + + assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getAcceptableFactor(), 0.9f, 0); + assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getMinFactor(), 0.5f, 0); + assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getExtraFreeFactor(), 0.5f, 0); + assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getSingleFactor(), 0.1f, 0); + assertEquals(BucketCache.MULTI_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getMultiFactor(), 0.7f, 0); + assertEquals(BucketCache.MEMORY_FACTOR_CONFIG_NAME + " failed to propagate.", cache.getMemoryFactor(), 0.2f, 0); + } + + @Test + public void testInvalidAcceptFactorConfig() throws IOException { + float[] configValues = {-1f, 0.2f, 0.86f, 1.05f}; + boolean[] expectedOutcomes = {false, false, true, false}; + Map configMappings = ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues); + Configuration conf = HBaseConfiguration.create(); + checkConfigValues(conf, configMappings, expectedOutcomes); + } + + @Test + public void testInvalidMinFactorConfig() throws IOException { + float[] configValues = {-1f, 0f, 0.96f, 1.05f}; + //throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0 + boolean[] expectedOutcomes = {false, true, false, false}; + Map configMappings = ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues); + Configuration conf = HBaseConfiguration.create(); + checkConfigValues(conf, configMappings, expectedOutcomes); + } + + @Test + public void testInvalidExtraFreeFactorConfig() throws IOException { + float[] configValues = {-1f, 0f, 0.2f, 1.05f}; + //throws due to <0, in expected range, in expected range, config can be > 1.0 + boolean[] expectedOutcomes = {false, true, true, true}; + Map configMappings = ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues); + Configuration conf = HBaseConfiguration.create(); + checkConfigValues(conf, configMappings, expectedOutcomes); + } + + @Test + public void testInvalidCacheSplitFactorConfig() throws IOException { + float[] singleFactorConfigValues = {0.2f, 0f, -0.2f, 1f}; + float[] multiFactorConfigValues = {0.4f, 0f, 1f, .05f}; + float[] memoryFactorConfigValues = {0.4f, 0f, 0.2f, .5f}; + // All configs add up to 1.0 and are between 0 and 1.0, configs don't add to 1.0, configs can't be negative, configs don't add to 1.0 + boolean[] expectedOutcomes = {true, false, false, false}; + Map configMappings = ImmutableMap.of(BucketCache.SINGLE_FACTOR_CONFIG_NAME, + singleFactorConfigValues, BucketCache.MULTI_FACTOR_CONFIG_NAME, multiFactorConfigValues, + BucketCache.MEMORY_FACTOR_CONFIG_NAME, memoryFactorConfigValues); + Configuration conf = HBaseConfiguration.create(); + checkConfigValues(conf, configMappings, expectedOutcomes); + } + + private void checkConfigValues(Configuration conf, Map configMap, boolean[] expectSuccess) throws IOException { + Set configNames = configMap.keySet(); + for (int i = 0; i < expectSuccess.length; i++) { + try { + for (String configName : configNames) { + conf.setFloat(configName, configMap.get(configName)[i]); + } + BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf); + assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); + } catch (IllegalArgumentException e) { + assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); + } + } + } + + private void validateGetPartitionSize(BucketCache bucketCache, float partitionFactor, float minFactor) { + long expectedOutput = (long) Math.floor(bucketCache.getAllocator().getTotalSize() * partitionFactor * minFactor); + assertEquals(expectedOutput, bucketCache.getPartitionSize(partitionFactor)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index cfba69a768d..4f6ffd2411a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.Cacheable; @@ -57,7 +58,7 @@ public class TestBucketWriterThread { int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) throws FileNotFoundException, IOException { super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, - persistencePath, ioErrorsTolerationDuration); + persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); } @Override