HBASE-10641 Configurable Bucket Sizes in bucketCache

This commit is contained in:
Nick Dimiduk 2014-06-03 15:47:09 -07:00
parent f24e68426c
commit d824f0b25f
4 changed files with 137 additions and 79 deletions

View File

@ -125,6 +125,12 @@ public class CacheConfig {
public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
"hbase.bucketcache.writer.queuelength";
/**
* A comma-delimited array of values for use as bucket sizes.
*/
public static final String BUCKET_CACHE_BUCKETS_KEY = "hbase.bucketcache.bucket.sizes";
/**
* Defaults for Bucket cache
*/
@ -473,6 +479,14 @@ public class CacheConfig {
float combinedPercentage = conf.getFloat(
BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
String[] configuredBucketSizes = conf.getStrings(BUCKET_CACHE_BUCKETS_KEY);
int[] bucketSizes = null;
if (configuredBucketSizes != null) {
bucketSizes = new int[configuredBucketSizes.length];
for (int i = 0; i < configuredBucketSizes.length; i++) {
bucketSizes[i] = Integer.parseInt(configuredBucketSizes[i]);
}
}
if (combinedWithLru) {
lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
@ -482,7 +496,7 @@ public class CacheConfig {
"hbase.bucketcache.ioengine.errors.tolerated.duration",
BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
bucketCache = new BucketCache(bucketCacheIOEngineName,
bucketCacheSize, blockSize, writerThreads, writerQueueLen, persistentPath,
bucketCacheSize, blockSize, bucketSizes, writerThreads, writerQueueLen, persistentPath,
ioErrorsTolerationDuration);
} catch (IOException ioex) {
LOG.error("Can't instantiate bucket cache", ioex);

View File

@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -55,11 +57,11 @@ public final class BucketAllocator {
sizeIndex = -1;
}
void reconfigure(int sizeIndex) {
void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
this.sizeIndex = sizeIndex;
assert sizeIndex < BUCKET_SIZES.length;
itemAllocationSize = BUCKET_SIZES[sizeIndex];
itemCount = (int) (((long) BUCKET_CAPACITY) / (long) itemAllocationSize);
itemAllocationSize = bucketSizes[sizeIndex];
itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
freeCount = itemCount;
usedCount = 0;
freeList = new int[itemCount];
@ -176,7 +178,7 @@ public final class BucketAllocator {
public void instantiateBucket(Bucket b) {
assert b.isUninstantiated() || b.isCompletelyFree();
b.reconfigure(sizeIndex);
b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
bucketList.add(b);
freeBuckets.add(b);
completelyFreeBuckets.add(b);
@ -246,15 +248,22 @@ public final class BucketAllocator {
free += b.freeCount();
used += b.usedCount();
}
return new IndexStatistics(free, used, BUCKET_SIZES[sizeIndex]);
return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
}
@Override
public String toString() {
return Objects.toStringHelper(this.getClass())
.add("sizeIndex", sizeIndex)
.add("bucketSize", bucketSizes[sizeIndex])
.toString();
}
}
// Default block size is 64K, so we choose more sizes near 64K, you'd better
// reset it according to your cluster's block size distribution
// TODO Make these sizes configurable
// TODO Support the view of block size distribution statistics
private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
@ -263,42 +272,49 @@ public final class BucketAllocator {
/**
* Round up the given block size to bucket size, and get the corresponding
* BucketSizeInfo
* @param blockSize
* @return BucketSizeInfo
*/
public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
for (int i = 0; i < BUCKET_SIZES.length; ++i)
if (blockSize <= BUCKET_SIZES[i])
for (int i = 0; i < bucketSizes.length; ++i)
if (blockSize <= bucketSizes[i])
return bucketSizeInfos[i];
return null;
}
static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead
static public final int FEWEST_ITEMS_IN_BUCKET = 4;
// The capacity size for each bucket
static final long BUCKET_CAPACITY = FEWEST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE;
private final int[] bucketSizes;
private final int bigItemSize;
// The capacity size for each bucket
private final long bucketCapacity;
private Bucket[] buckets;
private BucketSizeInfo[] bucketSizeInfos;
private final long totalSize;
private long usedSize = 0;
BucketAllocator(long availableSpace) throws BucketAllocatorException {
buckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)];
if (buckets.length < BUCKET_SIZES.length)
BucketAllocator(long availableSpace, int[] bucketSizes)
throws BucketAllocatorException {
this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
int largestBucket = this.bucketSizes[0];
for (int i : this.bucketSizes) {
largestBucket = Math.max(largestBucket, i);
}
this.bigItemSize = largestBucket;
this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
if (buckets.length < this.bucketSizes.length)
throw new BucketAllocatorException(
"Bucket allocator size too small - must have room for at least "
+ BUCKET_SIZES.length + " buckets");
bucketSizeInfos = new BucketSizeInfo[BUCKET_SIZES.length];
for (int i = 0; i < BUCKET_SIZES.length; ++i) {
+ this.bucketSizes.length + " buckets");
bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
for (int i = 0; i < this.bucketSizes.length; ++i) {
bucketSizeInfos[i] = new BucketSizeInfo(i);
}
for (int i = 0; i < buckets.length; ++i) {
buckets[i] = new Bucket(BUCKET_CAPACITY * i);
bucketSizeInfos[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1]
buckets[i] = new Bucket(bucketCapacity * i);
bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
.instantiateBucket(buckets[i]);
}
this.totalSize = ((long) buckets.length) * BUCKET_CAPACITY;
this.totalSize = ((long) buckets.length) * bucketCapacity;
}
/**
@ -309,9 +325,9 @@ public final class BucketAllocator {
* @param realCacheSize cached data size statistics for bucket cache
* @throws BucketAllocatorException
*/
BucketAllocator(long availableSpace, Map<BlockCacheKey, BucketEntry> map,
BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
AtomicLong realCacheSize) throws BucketAllocatorException {
this(availableSpace);
this(availableSpace, bucketSizes);
// each bucket has an offset, sizeindex. probably the buckets are too big
// in our default state. so what we do is reconfigure them according to what
@ -322,8 +338,8 @@ public final class BucketAllocator {
long foundOffset = entry.getValue().offset();
int foundLen = entry.getValue().getLength();
int bucketSizeIndex = -1;
for (int i = 0; i < BUCKET_SIZES.length; ++i) {
if (foundLen <= BUCKET_SIZES[i]) {
for (int i = 0; i < bucketSizes.length; ++i) {
if (foundLen <= bucketSizes[i]) {
bucketSizeIndex = i;
break;
}
@ -332,13 +348,13 @@ public final class BucketAllocator {
throw new BucketAllocatorException(
"Can't match bucket size for the block with size " + foundLen);
}
int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY);
int bucketNo = (int) (foundOffset / bucketCapacity);
if (bucketNo < 0 || bucketNo >= buckets.length)
throw new BucketAllocatorException("Can't find bucket " + bucketNo
+ ", total buckets=" + buckets.length
+ "; did you shrink the cache?");
Bucket b = buckets[bucketNo];
if (reconfigured[bucketNo] == true) {
if (reconfigured[bucketNo]) {
if (b.sizeIndex() != bucketSizeIndex)
throw new BucketAllocatorException(
"Inconsistent allocation in bucket map;");
@ -378,8 +394,7 @@ public final class BucketAllocator {
}
public long getFreeSize() {
long freeSize = this.totalSize - getUsedSize();
return freeSize;
return this.totalSize - getUsedSize();
}
public long getTotalSize() {
@ -404,7 +419,7 @@ public final class BucketAllocator {
// Ask caller to free up space and try again!
if (offset < 0)
throw new CacheFullException(blockSize, bsi.sizeIndex());
usedSize += BUCKET_SIZES[bsi.sizeIndex()];
usedSize += bucketSizes[bsi.sizeIndex()];
return offset;
}
@ -422,7 +437,7 @@ public final class BucketAllocator {
* @return size freed
*/
public synchronized int freeBlock(long offset) {
int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
int bucketNo = (int) (offset / bucketCapacity);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
@ -431,23 +446,19 @@ public final class BucketAllocator {
}
public int sizeIndexOfAllocation(long offset) {
int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
int bucketNo = (int) (offset / bucketCapacity);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
return targetBucket.sizeIndex();
}
public int sizeOfAllocation(long offset) {
int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
int bucketNo = (int) (offset / bucketCapacity);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
return targetBucket.itemAllocationSize();
}
static public int getMaximumAllocationIndex() {
return BUCKET_SIZES.length;
}
static class IndexStatistics {
private long freeCount, usedCount, itemSize, totalCount;
@ -533,7 +544,7 @@ public final class BucketAllocator {
}
public IndexStatistics[] getIndexStatistics() {
IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length];
IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
for (int i = 0; i < stats.length; ++i)
stats[i] = bucketSizeInfos[i].statistics();
return stats;

View File

@ -152,6 +152,7 @@ public class BucketCache implements BlockCache, HeapSize {
private long cacheCapacity;
/** Approximate block size */
private final long blockSize;
private final int[] bucketSizes;
/** Duration of IO errors tolerated before we disable cache, 1 min as default */
private final int ioErrorsTolerationDuration;
@ -194,15 +195,15 @@ public class BucketCache implements BlockCache, HeapSize {
// Allocate or free space for the block
private BucketAllocator bucketAllocator;
public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum,
int writerQLen, String persistencePath) throws FileNotFoundException,
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
this(ioEngineName, capacity, blockSize, writerThreadNum, writerQLen, persistencePath,
DEFAULT_ERROR_TOLERATION_DURATION);
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
}
public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum,
int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum];
@ -216,9 +217,10 @@ public class BucketCache implements BlockCache, HeapSize {
this.cacheCapacity = capacity;
this.persistencePath = persistencePath;
this.blockSize = blockSize;
this.bucketSizes = bucketSizes;
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
bucketAllocator = new BucketAllocator(capacity);
bucketAllocator = new BucketAllocator(capacity, bucketSizes);
for (int i = 0; i < writerThreads.length; ++i) {
writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
this.cacheWaitSignals[i] = new Object();
@ -814,8 +816,8 @@ public class BucketCache implements BlockCache, HeapSize {
+ ", expected:" + backingMap.getClass().getName());
UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
.readObject();
BucketAllocator allocator = new BucketAllocator(cacheCapacity,
backingMap, this.realCacheSize);
BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
backingMap, realCacheSize);
backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
.readObject();
bucketAllocator = allocator;

View File

@ -18,14 +18,15 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
@ -36,6 +37,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Basic test of BucketCache.Puts and gets.
@ -43,9 +46,29 @@ import org.junit.experimental.categories.Category;
* Tests will ensure that blocks' data correctness under several threads
* concurrency
*/
@RunWith(Parameterized.class)
@Category(SmallTests.class)
public class TestBucketCache {
static final Log LOG = LogFactory.getLog(TestBucketCache.class);
private static final Random RAND = new Random();
@Parameterized.Parameters(name="{index}: blockSize={0}, bucketSizes={1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ 8192, null }, // TODO: why is 8k the default blocksize for these tests?
{ 16 * 1024, new int[] {
2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
128 * 1024 + 1024 } }
});
}
@Parameterized.Parameter(0)
public int constructedBlockSize;
@Parameterized.Parameter(1)
public int[] constructedBlockSizes;
BucketCache cache;
final int CACHE_SIZE = 1000000;
final int NUM_BLOCKS = 100;
@ -61,11 +84,11 @@ public class TestBucketCache {
private class MockedBucketCache extends BucketCache {
public MockedBucketCache(String ioEngineName, long capacity,
int writerThreads,
int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
super(ioEngineName, capacity, 8192, writerThreads, writerQLen, persistencePath);
public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreads, int writerQLen, String persistencePath)
throws FileNotFoundException, IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath);
super.wait_when_cache = true;
}
@ -89,8 +112,8 @@ public class TestBucketCache {
@Before
public void setup() throws FileNotFoundException, IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, writeThreads,
writerQLen, persistencePath);
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
}
@After
@ -98,41 +121,49 @@ public class TestBucketCache {
cache.shutdown();
}
/**
* Return a random element from {@code a}.
*/
private static <T> T randFrom(List<T> a) {
return a.get(RAND.nextInt(a.size()));
}
@Test
public void testBucketAllocator() throws BucketAllocatorException {
BucketAllocator mAllocator = cache.getAllocator();
/*
* Test the allocator first
*/
int[] blockSizes = new int[2];
blockSizes[0] = 4 * 1024;
blockSizes[1] = 8 * 1024;
final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);
boolean full = false;
int i = 0;
ArrayList<Long> allocations = new ArrayList<Long>();
// Fill the allocated extents
while (!full) {
// Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
// the cache is completely filled.
List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES);
for (int i = 0; !full; i++) {
Integer blockSize = null;
try {
allocations.add(new Long(mAllocator.allocateBlock(blockSizes[i
% blockSizes.length])));
++i;
blockSize = randFrom(tmp);
allocations.add(mAllocator.allocateBlock(blockSize));
} catch (CacheFullException cfe) {
full = true;
tmp.remove(blockSize);
if (tmp.isEmpty()) full = true;
}
}
for (i = 0; i < blockSizes.length; i++) {
BucketSizeInfo bucketSizeInfo = mAllocator
.roundUpToBucketSizeInfo(blockSizes[0]);
for (Integer blockSize : BLOCKSIZES) {
BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
IndexStatistics indexStatistics = bucketSizeInfo.statistics();
assertTrue(indexStatistics.freeCount() == 0);
assertEquals(
"unexpected freeCount for " + bucketSizeInfo,
0, indexStatistics.freeCount());
}
for (long offset : allocations) {
assertTrue(mAllocator.sizeOfAllocation(offset) == mAllocator
.freeBlock(offset));
assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
}
assertTrue(mAllocator.getUsedSize() == 0);
assertEquals(0, mAllocator.getUsedSize());
}
@Test