HBASE-16460 Can't rebuild the BucketAllocator's data structures when BucketCache uses FileIOEngine (Guanghao Zhang)
This commit is contained in:
parent
e1aab356b3
commit
b694b63ed7
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -350,25 +351,31 @@ public final class BucketAllocator {
|
|||
// we've found. we can only reconfigure each bucket once; if more than once,
|
||||
// we know there's a bug, so we just log the info, throw, and start again...
|
||||
boolean[] reconfigured = new boolean[buckets.length];
|
||||
for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
|
||||
int sizeNotMatchedCount = 0;
|
||||
int insufficientCapacityCount = 0;
|
||||
Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
|
||||
long foundOffset = entry.getValue().offset();
|
||||
int foundLen = entry.getValue().getLength();
|
||||
int bucketSizeIndex = -1;
|
||||
for (int i = 0; i < bucketSizes.length; ++i) {
|
||||
if (foundLen <= bucketSizes[i]) {
|
||||
for (int i = 0; i < this.bucketSizes.length; ++i) {
|
||||
if (foundLen <= this.bucketSizes[i]) {
|
||||
bucketSizeIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (bucketSizeIndex == -1) {
|
||||
throw new BucketAllocatorException(
|
||||
"Can't match bucket size for the block with size " + foundLen);
|
||||
sizeNotMatchedCount++;
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
int bucketNo = (int) (foundOffset / bucketCapacity);
|
||||
if (bucketNo < 0 || bucketNo >= buckets.length)
|
||||
throw new BucketAllocatorException("Can't find bucket " + bucketNo
|
||||
+ ", total buckets=" + buckets.length
|
||||
+ "; did you shrink the cache?");
|
||||
if (bucketNo < 0 || bucketNo >= buckets.length) {
|
||||
insufficientCapacityCount++;
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
Bucket b = buckets[bucketNo];
|
||||
if (reconfigured[bucketNo]) {
|
||||
if (b.sizeIndex() != bucketSizeIndex)
|
||||
|
@ -391,6 +398,15 @@ public final class BucketAllocator {
|
|||
usedSize += buckets[bucketNo].getItemAllocationSize();
|
||||
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
|
||||
}
|
||||
|
||||
if (sizeNotMatchedCount > 0) {
|
||||
LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because "
|
||||
+ "there is no matching bucket size for these blocks");
|
||||
}
|
||||
if (insufficientCapacityCount > 0) {
|
||||
LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
|
||||
+ "did you shrink the cache?");
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
|
|
|
@ -931,12 +931,13 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
+ ", expected:" + backingMap.getClass().getName());
|
||||
UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
|
||||
.readObject();
|
||||
ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
|
||||
(ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
|
||||
BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
|
||||
backingMap, realCacheSize);
|
||||
backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
|
||||
.readObject();
|
||||
backingMapFromFile, realCacheSize);
|
||||
bucketAllocator = allocator;
|
||||
deserialiserMap = deserMap;
|
||||
backingMap = backingMapFromFile;
|
||||
} finally {
|
||||
if (ois != null) ois.close();
|
||||
if (fis != null) fis.close();
|
||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.io.compress.Compression;
|
|||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class CacheTestUtils {
|
||||
|
||||
private static final boolean includesMemstoreTS = true;
|
||||
|
@ -333,7 +335,7 @@ public class CacheTestUtils {
|
|||
}
|
||||
|
||||
|
||||
private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
|
||||
public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
|
||||
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
|
||||
Random rand = new Random();
|
||||
HashSet<String> usedStrings = new HashSet<String>();
|
||||
|
@ -376,8 +378,17 @@ public class CacheTestUtils {
|
|||
return returnedBlocks;
|
||||
}
|
||||
|
||||
private static class HFileBlockPair {
|
||||
@VisibleForTesting
|
||||
public static class HFileBlockPair {
|
||||
BlockCacheKey blockName;
|
||||
HFileBlock block;
|
||||
|
||||
public BlockCacheKey getBlockName() {
|
||||
return this.blockName;
|
||||
}
|
||||
|
||||
public HFileBlock getBlock() {
|
||||
return this.block;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,8 +29,11 @@ import java.util.List;
|
|||
import java.util.Random;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||
|
@ -219,4 +222,49 @@ public class TestBucketCache {
|
|||
assertTrue(cache.getCurrentSize() > 0L);
|
||||
assertTrue("We should have a block!", cache.iterator().hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetrieveFromFile() throws Exception {
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||
|
||||
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
|
||||
+ "/bucket.persistence");
|
||||
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize == 0);
|
||||
|
||||
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||
// Add blocks
|
||||
for (HFileBlockPair block : blocks) {
|
||||
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
|
||||
}
|
||||
for (HFileBlockPair block : blocks) {
|
||||
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||
}
|
||||
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertTrue(usedSize != 0);
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
|
||||
// restore cache from file
|
||||
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
|
||||
+ "/bucket.persistence");
|
||||
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
|
||||
// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
|
||||
// so it can't restore cache from file
|
||||
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
|
||||
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||
constructedBlockSize, smallBucketSizes, writeThreads,
|
||||
writerQLen, testDir + "/bucket.persistence");
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
|
||||
TEST_UTIL.cleanupTestDir();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue