From b6ba13c37715422710a142f6f82ba4817129c3d6 Mon Sep 17 00:00:00 2001 From: tedyu Date: Mon, 5 Sep 2016 06:50:50 -0700 Subject: [PATCH] HBASE-16460 Can't rebuild the BucketAllocator's data structures when BucketCache uses FileIOEngine (Guanghao Zhang) --- .../io/hfile/bucket/BucketAllocator.java | 49 +++++++++++++------ .../hbase/io/hfile/bucket/BucketCache.java | 7 +-- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 17 +++++-- .../io/hfile/bucket/TestBucketCache.java | 48 ++++++++++++++++++ 4 files changed, 100 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index b5cd0c35e4b..67a4f1f69b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -349,34 +352,41 @@ public final class BucketAllocator { // we've found. we can only reconfigure each bucket once; if more than once, // we know there's a bug, so we just log the info, throw, and start again... boolean[] reconfigured = new boolean[buckets.length]; - for (Map.Entry entry : map.entrySet()) { + int sizeNotMatchedCount = 0; + int insufficientCapacityCount = 0; + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); long foundOffset = entry.getValue().offset(); int foundLen = entry.getValue().getLength(); int bucketSizeIndex = -1; - for (int i = 0; i < bucketSizes.length; ++i) { - if (foundLen <= bucketSizes[i]) { + for (int i = 0; i < this.bucketSizes.length; ++i) { + if (foundLen <= this.bucketSizes[i]) { bucketSizeIndex = i; break; } } if (bucketSizeIndex == -1) { - throw new BucketAllocatorException( - "Can't match bucket size for the block with size " + foundLen); + sizeNotMatchedCount++; + iterator.remove(); + continue; } int bucketNo = (int) (foundOffset / bucketCapacity); - if (bucketNo < 0 || bucketNo >= buckets.length) - throw new BucketAllocatorException("Can't find bucket " + bucketNo - + ", total buckets=" + buckets.length - + "; did you shrink the cache?"); + if (bucketNo < 0 || bucketNo >= buckets.length) { + insufficientCapacityCount++; + iterator.remove(); + continue; + } Bucket b = buckets[bucketNo]; if (reconfigured[bucketNo]) { - if (b.sizeIndex() != bucketSizeIndex) - throw new BucketAllocatorException( - "Inconsistent allocation in bucket map;"); + if (b.sizeIndex() != bucketSizeIndex) { + throw new BucketAllocatorException("Inconsistent allocation in bucket map;"); + } } else { - if (!b.isCompletelyFree()) - throw new BucketAllocatorException("Reconfiguring bucket " - + bucketNo + " but it's already allocated; corrupt data"); + if (!b.isCompletelyFree()) { + throw new BucketAllocatorException( + "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data"); + } // Need to remove the bucket from whichever list it's currently in at // the moment... BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex]; @@ -390,6 +400,15 @@ public final class BucketAllocator { usedSize += buckets[bucketNo].getItemAllocationSize(); bucketSizeInfos[bucketSizeIndex].blockAllocated(b); } + + if (sizeNotMatchedCount > 0) { + LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " + + "there is no matching bucket size for these blocks"); + } + if (insufficientCapacityCount > 0) { + LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - " + + "did you shrink the cache?"); + } } public String toString() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index b7e5fac2c7f..ec7a71f26e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -996,12 +996,13 @@ public class BucketCache implements BlockCache, HeapSize { + ", expected:" + backingMap.getClass().getName()); UniqueIndexMap deserMap = (UniqueIndexMap) ois .readObject(); + ConcurrentHashMap backingMapFromFile = + (ConcurrentHashMap) ois.readObject(); BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, - backingMap, realCacheSize); - backingMap = (ConcurrentHashMap) ois - .readObject(); + backingMapFromFile, realCacheSize); bucketAllocator = allocator; deserialiserMap = deserMap; + backingMap = backingMapFromFile; } finally { if (ois != null) ois.close(); if (fis != null) fis.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 040685d544f..bd3f4c7fea6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ChecksumType; +import com.google.common.annotations.VisibleForTesting; + public class CacheTestUtils { private static final boolean includesMemstoreTS = true; @@ -339,7 +341,7 @@ public class CacheTestUtils { } - private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { + public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; Random rand = new Random(); HashSet usedStrings = new HashSet(); @@ -382,8 +384,17 @@ public class CacheTestUtils { return returnedBlocks; } - private static class HFileBlockPair { + @VisibleForTesting + public static class HFileBlockPair { BlockCacheKey blockName; HFileBlock block; + + public BlockCacheKey getBlockName() { + return this.blockName; + } + + public HFileBlock getBlock() { + return this.block; + } } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 54dd8e58d29..6fe352da504 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -29,9 +29,12 @@ import java.util.List; import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; import org.apache.hadoop.hbase.testclassification.IOTests; @@ -220,4 +223,49 @@ public class TestBucketCache { assertTrue(cache.getCurrentSize() > 0L); assertTrue("We should have a block!", cache.iterator().hasNext()); } + + @Test + public void testRetrieveFromFile() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (HFileBlockPair block : blocks) { + bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); + } + for (HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // restore cache from file + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + + "/bucket.persistence"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + // persist cache to file + bucketCache.shutdown(); + + // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) + // so it can't restore cache from file + int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, smallBucketSizes, writeThreads, + writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } }