HBASE-16460 Can't rebuild the BucketAllocator's data structures when BucketCache uses FileIOEngine (Guanghao Zhang)
This commit is contained in:
parent
ab07f0087b
commit
b6ba13c377
|
@ -20,7 +20,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -349,34 +352,41 @@ public final class BucketAllocator {
|
||||||
// we've found. we can only reconfigure each bucket once; if more than once,
|
// we've found. we can only reconfigure each bucket once; if more than once,
|
||||||
// we know there's a bug, so we just log the info, throw, and start again...
|
// we know there's a bug, so we just log the info, throw, and start again...
|
||||||
boolean[] reconfigured = new boolean[buckets.length];
|
boolean[] reconfigured = new boolean[buckets.length];
|
||||||
for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
|
int sizeNotMatchedCount = 0;
|
||||||
|
int insufficientCapacityCount = 0;
|
||||||
|
Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
|
||||||
long foundOffset = entry.getValue().offset();
|
long foundOffset = entry.getValue().offset();
|
||||||
int foundLen = entry.getValue().getLength();
|
int foundLen = entry.getValue().getLength();
|
||||||
int bucketSizeIndex = -1;
|
int bucketSizeIndex = -1;
|
||||||
for (int i = 0; i < bucketSizes.length; ++i) {
|
for (int i = 0; i < this.bucketSizes.length; ++i) {
|
||||||
if (foundLen <= bucketSizes[i]) {
|
if (foundLen <= this.bucketSizes[i]) {
|
||||||
bucketSizeIndex = i;
|
bucketSizeIndex = i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (bucketSizeIndex == -1) {
|
if (bucketSizeIndex == -1) {
|
||||||
throw new BucketAllocatorException(
|
sizeNotMatchedCount++;
|
||||||
"Can't match bucket size for the block with size " + foundLen);
|
iterator.remove();
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
int bucketNo = (int) (foundOffset / bucketCapacity);
|
int bucketNo = (int) (foundOffset / bucketCapacity);
|
||||||
if (bucketNo < 0 || bucketNo >= buckets.length)
|
if (bucketNo < 0 || bucketNo >= buckets.length) {
|
||||||
throw new BucketAllocatorException("Can't find bucket " + bucketNo
|
insufficientCapacityCount++;
|
||||||
+ ", total buckets=" + buckets.length
|
iterator.remove();
|
||||||
+ "; did you shrink the cache?");
|
continue;
|
||||||
|
}
|
||||||
Bucket b = buckets[bucketNo];
|
Bucket b = buckets[bucketNo];
|
||||||
if (reconfigured[bucketNo]) {
|
if (reconfigured[bucketNo]) {
|
||||||
if (b.sizeIndex() != bucketSizeIndex)
|
if (b.sizeIndex() != bucketSizeIndex) {
|
||||||
throw new BucketAllocatorException(
|
throw new BucketAllocatorException("Inconsistent allocation in bucket map;");
|
||||||
"Inconsistent allocation in bucket map;");
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!b.isCompletelyFree())
|
if (!b.isCompletelyFree()) {
|
||||||
throw new BucketAllocatorException("Reconfiguring bucket "
|
throw new BucketAllocatorException(
|
||||||
+ bucketNo + " but it's already allocated; corrupt data");
|
"Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data");
|
||||||
|
}
|
||||||
// Need to remove the bucket from whichever list it's currently in at
|
// Need to remove the bucket from whichever list it's currently in at
|
||||||
// the moment...
|
// the moment...
|
||||||
BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
|
BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
|
||||||
|
@ -390,6 +400,15 @@ public final class BucketAllocator {
|
||||||
usedSize += buckets[bucketNo].getItemAllocationSize();
|
usedSize += buckets[bucketNo].getItemAllocationSize();
|
||||||
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
|
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sizeNotMatchedCount > 0) {
|
||||||
|
LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " +
|
||||||
|
"there is no matching bucket size for these blocks");
|
||||||
|
}
|
||||||
|
if (insufficientCapacityCount > 0) {
|
||||||
|
LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
|
||||||
|
+ "did you shrink the cache?");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
|
|
@ -996,12 +996,13 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
+ ", expected:" + backingMap.getClass().getName());
|
+ ", expected:" + backingMap.getClass().getName());
|
||||||
UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
|
UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
|
||||||
.readObject();
|
.readObject();
|
||||||
|
ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
|
||||||
|
(ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
|
||||||
BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
|
BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
|
||||||
backingMap, realCacheSize);
|
backingMapFromFile, realCacheSize);
|
||||||
backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
|
|
||||||
.readObject();
|
|
||||||
bucketAllocator = allocator;
|
bucketAllocator = allocator;
|
||||||
deserialiserMap = deserMap;
|
deserialiserMap = deserMap;
|
||||||
|
backingMap = backingMapFromFile;
|
||||||
} finally {
|
} finally {
|
||||||
if (ois != null) ois.close();
|
if (ois != null) ois.close();
|
||||||
if (fis != null) fis.close();
|
if (fis != null) fis.close();
|
||||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
public class CacheTestUtils {
|
public class CacheTestUtils {
|
||||||
|
|
||||||
private static final boolean includesMemstoreTS = true;
|
private static final boolean includesMemstoreTS = true;
|
||||||
|
@ -339,7 +341,7 @@ public class CacheTestUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
|
public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
|
||||||
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
|
HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
|
||||||
Random rand = new Random();
|
Random rand = new Random();
|
||||||
HashSet<String> usedStrings = new HashSet<String>();
|
HashSet<String> usedStrings = new HashSet<String>();
|
||||||
|
@ -382,8 +384,17 @@ public class CacheTestUtils {
|
||||||
return returnedBlocks;
|
return returnedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class HFileBlockPair {
|
@VisibleForTesting
|
||||||
|
public static class HFileBlockPair {
|
||||||
BlockCacheKey blockName;
|
BlockCacheKey blockName;
|
||||||
HFileBlock block;
|
HFileBlock block;
|
||||||
|
|
||||||
|
public BlockCacheKey getBlockName() {
|
||||||
|
return this.blockName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HFileBlock getBlock() {
|
||||||
|
return this.block;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,9 +29,12 @@ import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
@ -220,4 +223,49 @@ public class TestBucketCache {
|
||||||
assertTrue(cache.getCurrentSize() > 0L);
|
assertTrue(cache.getCurrentSize() > 0L);
|
||||||
assertTrue("We should have a block!", cache.iterator().hasNext());
|
assertTrue("We should have a block!", cache.iterator().hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetrieveFromFile() throws Exception {
|
||||||
|
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
Path testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
|
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
|
||||||
|
+ "/bucket.persistence");
|
||||||
|
long usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize == 0);
|
||||||
|
|
||||||
|
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||||
|
// Add blocks
|
||||||
|
for (HFileBlockPair block : blocks) {
|
||||||
|
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
for (HFileBlockPair block : blocks) {
|
||||||
|
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
|
||||||
|
}
|
||||||
|
usedSize = bucketCache.getAllocator().getUsedSize();
|
||||||
|
assertTrue(usedSize != 0);
|
||||||
|
// persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// restore cache from file
|
||||||
|
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
|
||||||
|
+ "/bucket.persistence");
|
||||||
|
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
|
||||||
|
// persist cache to file
|
||||||
|
bucketCache.shutdown();
|
||||||
|
|
||||||
|
// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
|
||||||
|
// so it can't restore cache from file
|
||||||
|
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
|
||||||
|
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
|
||||||
|
constructedBlockSize, smallBucketSizes, writeThreads,
|
||||||
|
writerQLen, testDir + "/bucket.persistence");
|
||||||
|
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||||
|
assertEquals(0, bucketCache.backingMap.size());
|
||||||
|
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue