diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index 46e8e249046..267299276e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -230,6 +230,9 @@ public class BlockCacheUtil { BlockCacheKey cacheKey, Cacheable newBlock) { // NOTICE: The getBlock has retained the existingBlock inside. Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false); + if (existingBlock == null) { + return true; + } try { int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey); if (comparison < 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 846460f300a..079907e3f25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; + import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -677,18 +679,24 @@ public class HFileBlock implements Cacheable { HFileBlock unpacked = new HFileBlock(this); unpacked.allocateBuffer(); // allocates space for the decompressed block - - HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA - ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); - - ByteBuff dup = this.buf.duplicate(); - dup.position(this.headerSize()); - dup = dup.slice(); - - ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), - unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); - - return unpacked; + boolean succ = false; + try { + HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA + ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); + // Create a duplicated buffer without the header part. + ByteBuff dup = this.buf.duplicate(); + dup.position(this.headerSize()); + dup = dup.slice(); + // Decode the dup into unpacked#buf + ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); + succ = true; + return unpacked; + } finally { + if (!succ) { + unpacked.release(); + } + } } /** @@ -709,7 +717,7 @@ public class HFileBlock implements Cacheable { buf = newBuf; // set limit to exclude next block's header - buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); + buf.limit(capacityNeeded); } /** @@ -1685,7 +1693,7 @@ public class HFileBlock implements Cacheable { } private ByteBuff allocate(int size, boolean intoHeap) { - return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size); + return intoHeap ? HEAP.allocate(size) : allocator.allocate(size); } /** @@ -1735,7 +1743,7 @@ public class HFileBlock implements Cacheable { if (LOG.isTraceEnabled()) { LOG.trace("Extra see to get block size!", new RuntimeException()); } - headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize)); + headerBuf = HEAP.allocate(hdrSize); readAtOffset(is, headerBuf, hdrSize, false, offset, pread); headerBuf.rewind(); } @@ -1778,7 +1786,7 @@ public class HFileBlock implements Cacheable { // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already // contains the header of next block, so no need to set next block's header in it. HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE, - offset, nextBlockOnDiskSize, fileContext, allocator); + offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator); // Run check on uncompressed sizings. if (!fileContext.isCompressedOrEncrypted()) { hFileBlock.sanityCheckUncompressed(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index ad6183974d3..8396192fe77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -313,10 +313,13 @@ public class HFileBlockIndex { int index = -1; HFileBlock block = null; - boolean dataBlock = false; KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue(); while (true) { try { + // Must initialize it with null here, because if don't and once an exception happen in + // readBlock, then we'll release the previous assigned block twice in the finally block. + // (See HBASE-22422) + block = null; if (currentBlock != null && currentBlock.getOffset() == currentOffset) { // Avoid reading the same block again, even with caching turned off. // This is crucial for compaction-type workload which might have @@ -336,9 +339,8 @@ public class HFileBlockIndex { // this also accounts for ENCODED_DATA expectedBlockType = BlockType.DATA; } - block = - cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread, - isCompaction, true, expectedBlockType, expectedDataBlockEncoding); + block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, + pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding); } if (block == null) { @@ -348,7 +350,6 @@ public class HFileBlockIndex { // Found a data block, break the loop and check our level in the tree. if (block.getBlockType().isData()) { - dataBlock = true; break; } @@ -381,7 +382,7 @@ public class HFileBlockIndex { nextIndexedKey = tmpNextIndexKV; } } finally { - if (!dataBlock && block != null) { + if (block != null && !block.getBlockType().isData()) { // Release the block immediately if it is not the data block block.release(); } @@ -389,7 +390,7 @@ public class HFileBlockIndex { } if (lookupLevel != searchTreeLevel) { - assert dataBlock == true; + assert block.getBlockType().isData(); // Though we have retrieved a data block we have found an issue // in the retrieved data block. Hence returned the block so that // the ref count can be decremented @@ -401,8 +402,7 @@ public class HFileBlockIndex { } // set the next indexed key for the current block. - BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey); - return blockWithScanInfo; + return new BlockWithScanInfo(block, nextIndexedKey); } @Override @@ -576,8 +576,7 @@ public class HFileBlockIndex { boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding) throws IOException { BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock, - cacheBlocks, - pread, isCompaction, expectedDataBlockEncoding); + cacheBlocks, pread, isCompaction, expectedDataBlockEncoding); if (blockWithScanInfo == null) { return null; } else { @@ -600,9 +599,8 @@ public class HFileBlockIndex { * @throws IOException */ public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock, - boolean cacheBlocks, - boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding) - throws IOException; + boolean cacheBlocks, boolean pread, boolean isCompaction, + DataBlockEncoding expectedDataBlockEncoding) throws IOException; /** * An approximation to the {@link HFile}'s mid-key. Operates on block diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 02e56e96e7e..be8cabb6883 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1134,15 +1134,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { updateCurrentBlock(newBlock); } - protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, - boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (this.curBlock == null - || this.curBlock.getOffset() != seekToBlock.getOffset()) { + protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, + Cell key, boolean seekBefore) throws IOException { + if (this.curBlock == null || this.curBlock.getOffset() != seekToBlock.getOffset()) { updateCurrentBlock(seekToBlock); } else if (rewind) { blockBuffer.rewind(); } - // Update the nextIndexedKey this.nextIndexedKey = nextIndexedKey; return blockSeek(key, seekBefore); @@ -1480,10 +1478,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // Validate encoding type for data blocks. We include encoding // type in the cache key, and we expect it to match on a cache hit. if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { + // Remember to release the block when in exceptional path. + cachedBlock.release(); throw new IOException("Cached block under key " + cacheKey + " " - + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " - + dataBlockEncoder.getDataBlockEncoding() + ")" - + ", path=" + path); + + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " + + dataBlockEncoder.getDataBlockEncoding() + "), path=" + path); } } // Cache-hit. Return! @@ -1507,15 +1506,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); // Cache the block if necessary - AtomicBoolean cachedRaw = new AtomicBoolean(false); cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cachedRaw.set(cacheConf.shouldCacheCompressed(category)); - cache.cacheBlock(cacheKey, cachedRaw.get() ? hfileBlock : unpacked, + cache.cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, cacheConf.isInMemory()); } }); - if (unpacked != hfileBlock && !cachedRaw.get()) { + if (unpacked != hfileBlock) { // End of life here if hfileBlock is an independent block. hfileBlock.release(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 82e64e7bd60..70715ae148c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -512,7 +512,14 @@ public class LruBlockCache implements FirstLevelBlockCache { @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { - LruCachedBlock cb = map.get(cacheKey); + LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { + // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside + // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove + // the block and release, then we're retaining a block with refCnt=0 which is disallowed. + // see HBASE-22422. + val.getBuffer().retain(); + return val; + }); if (cb == null) { if (!repeat && updateCacheMetrics) { stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); @@ -540,10 +547,10 @@ public class LruBlockCache implements FirstLevelBlockCache { } return null; } - if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + if (updateCacheMetrics) { + stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } cb.access(count.incrementAndGet()); - // It will be referenced by RPC path, so increase here. - cb.getBuffer().retain(); return cb.getBuffer(); } @@ -601,8 +608,6 @@ public class LruBlockCache implements FirstLevelBlockCache { if (previous == null) { return 0; } - // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. - previous.getBuffer().release(); updateSizeMetrics(block, true); long val = elements.decrementAndGet(); if (LOG.isTraceEnabled()) { @@ -610,7 +615,7 @@ public class LruBlockCache implements FirstLevelBlockCache { assertCounterSanity(size, val); } if (block.getBuffer().getBlockType().isData()) { - dataBlockElements.decrement(); + dataBlockElements.decrement(); } if (evictedByEvictionProcess) { // When the eviction of the block happened because of invalidation of HFiles, no need to @@ -620,6 +625,10 @@ public class LruBlockCache implements FirstLevelBlockCache { victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); } } + // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO + // NOT move this up because if do that then the victimHandler may access the buffer with + // refCnt = 0 which is disallowed. + previous.getBuffer().release(); return block.heapSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index bb0b79c0a56..83cd90b7ab1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; @@ -1532,21 +1533,28 @@ public class BucketCache implements BlockCache, HeapSize { } public RAMQueueEntry get(BlockCacheKey key) { - RAMQueueEntry re = delegate.get(key); - if (re != null) { - // It'll be referenced by RPC, so retain here. + return delegate.computeIfPresent(key, (k, re) -> { + // It'll be referenced by RPC, so retain atomically here. if the get and retain is not + // atomic, another thread may remove and release the block, when retaining in this thread we + // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) re.getData().retain(); - } - return re; + return re; + }); } + /** + * Return the previous associated value, or null if absent. It has the same meaning as + * {@link ConcurrentMap#putIfAbsent(Object, Object)} + */ public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { - RAMQueueEntry previous = delegate.putIfAbsent(key, entry); - if (previous == null) { + AtomicBoolean absent = new AtomicBoolean(false); + RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { // The RAMCache reference to this entry, so reference count should be increment. entry.getData().retain(); - } - return previous; + absent.set(true); + return entry; + }); + return absent.get() ? null : re; } public boolean remove(BlockCacheKey key) { @@ -1575,8 +1583,9 @@ public class BucketCache implements BlockCache, HeapSize { public void clear() { Iterator> it = delegate.entrySet().iterator(); while (it.hasNext()) { - it.next().getValue().getData().release(); + RAMQueueEntry re = it.next().getValue(); it.remove(); + re.getData().release(); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java index f4dc38aeae7..a086a3bda2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import static org.junit.Assert.assertEquals; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache.CombinedCacheStats; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -33,6 +38,8 @@ public class TestCombinedBlockCache { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCombinedBlockCache.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + @Test public void testCombinedCacheStats() { CacheStats lruCacheStats = new CacheStats("lruCacheStats", 2); @@ -102,4 +109,14 @@ public class TestCombinedBlockCache { assertEquals(0.75, stats.getHitRatioPastNPeriods(), delta); assertEquals(0.8, stats.getHitCachingRatioPastNPeriods(), delta); } + + @Test + public void testMultiThreadGetAndEvictBlock() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); + conf.setInt(BUCKET_CACHE_SIZE_KEY, 32); + BlockCache blockCache = BlockCacheFactory.createBlockCache(conf); + Assert.assertTrue(blockCache instanceof CombinedBlockCache); + TestLruBlockCache.testMultiThreadGetAndEvictBlockInternal(blockCache); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java index 3317a4d9e7f..a355ab0ad00 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -27,6 +28,7 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -34,15 +36,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tests the concurrent LruBlockCache.

@@ -58,6 +62,8 @@ public class TestLruBlockCache { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestLruBlockCache.class); + private static final Logger LOG = LoggerFactory.getLogger(TestLruBlockCache.class); + @Test public void testCacheEvictionThreadSafe() throws Exception { long maxSize = 100000; @@ -814,11 +820,10 @@ public class TestLruBlockCache { byte[] byteArr = new byte[length]; ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); HFileContext meta = new HFileContextBuilder().build(); - ByteBuffAllocator alloc = ByteBuffAllocator.HEAP; HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, - HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc); + HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP); HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, - HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc); + HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP); LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int)Math.ceil(1.2*maxSize/blockSize), @@ -958,5 +963,75 @@ public class TestLruBlockCache { } + static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception { + int size = 100; + int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; + byte[] byteArr = new byte[length]; + HFileContext meta = new HFileContextBuilder().build(); + BlockCacheKey key = new BlockCacheKey("key1", 0); + HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1, + ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP); + AtomicBoolean err1 = new AtomicBoolean(false); + Thread t1 = new Thread(() -> { + for (int i = 0; i < 10000 && !err1.get(); i++) { + try { + cache.getBlock(key, false, false, true); + } catch (Exception e) { + err1.set(true); + LOG.info("Cache block or get block failure: ", e); + } + } + }); + + AtomicBoolean err2 = new AtomicBoolean(false); + Thread t2 = new Thread(() -> { + for (int i = 0; i < 10000 && !err2.get(); i++) { + try { + cache.evictBlock(key); + } catch (Exception e) { + err2.set(true); + LOG.info("Evict block failure: ", e); + } + } + }); + + AtomicBoolean err3 = new AtomicBoolean(false); + Thread t3 = new Thread(() -> { + for (int i = 0; i < 10000 && !err3.get(); i++) { + try { + cache.cacheBlock(key, blk); + } catch (Exception e) { + err3.set(true); + LOG.info("Cache block failure: ", e); + } + } + }); + t1.start(); + t2.start(); + t3.start(); + t1.join(); + t2.join(); + t3.join(); + Assert.assertFalse(err1.get()); + Assert.assertFalse(err2.get()); + Assert.assertFalse(err3.get()); + } + + @Test + public void testMultiThreadGetAndEvictBlock() throws Exception { + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + LruBlockCache cache = + new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), + LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.66f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, 1024); + testMultiThreadGetAndEvictBlockInternal(cache); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java new file mode 100644 index 00000000000..5c5dda65c84 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ IOTests.class, MediumTests.class }) +public class TestRAMCache { + private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRAMCache.class); + + // Define a mock HFileBlock. + private static class MockHFileBlock extends HFileBlock { + + private volatile CountDownLatch latch; + + MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, + int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader, + long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, + HFileContext fileContext, ByteBuffAllocator allocator) { + super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b, + fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, + allocator); + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + public MockHFileBlock retain() { + try { + if (latch != null) { + latch.await(); + } + } catch (InterruptedException e) { + LOG.info("Interrupted exception error: ", e); + } + super.retain(); + return this; + } + } + + @Test + public void testAtomicRAMCache() throws Exception { + int size = 100; + int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; + byte[] byteArr = new byte[length]; + + RAMCache cache = new RAMCache(); + BlockCacheKey key = new BlockCacheKey("file-1", 1); + MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1, + ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, + new HFileContextBuilder().build(), ByteBuffAllocator.HEAP); + RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE); + + Assert.assertNull(cache.putIfAbsent(key, re)); + Assert.assertEquals(cache.putIfAbsent(key, re), re); + + CountDownLatch latch = new CountDownLatch(1); + blk.setLatch(latch); + + AtomicBoolean error = new AtomicBoolean(false); + Thread t1 = new Thread(() -> { + try { + cache.get(key); + } catch (Exception e) { + error.set(true); + } + }); + t1.start(); + Thread.sleep(200); + + AtomicBoolean removed = new AtomicBoolean(false); + Thread t2 = new Thread(() -> { + cache.remove(key); + removed.set(true); + }); + t2.start(); + Thread.sleep(200); + Assert.assertFalse(removed.get()); + + latch.countDown(); + Thread.sleep(200); + Assert.assertTrue(removed.get()); + Assert.assertFalse(error.get()); + } +}