HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)

This commit is contained in:
openinx 2019-05-28 10:10:34 +08:00 committed by huzheng
parent 9e5fc2b379
commit 1f5cf41345
9 changed files with 306 additions and 63 deletions

View File

@ -230,6 +230,9 @@ public class BlockCacheUtil {
BlockCacheKey cacheKey, Cacheable newBlock) { BlockCacheKey cacheKey, Cacheable newBlock) {
// NOTICE: The getBlock has retained the existingBlock inside. // NOTICE: The getBlock has retained the existingBlock inside.
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false); Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
if (existingBlock == null) {
return true;
}
try { try {
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey); int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
if (comparison < 0) { if (comparison < 0) {

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -678,18 +680,24 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = new HFileBlock(this); HFileBlock unpacked = new HFileBlock(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block unpacked.allocateBuffer(); // allocates space for the decompressed block
boolean succ = false;
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA try {
? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
ByteBuff dup = this.buf.duplicate(); // Create a duplicated buffer without the header part.
dup.position(this.headerSize()); ByteBuff dup = this.buf.duplicate();
dup = dup.slice(); dup.position(this.headerSize());
dup = dup.slice();
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), // Decode the dup into unpacked#buf
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
return unpacked; succ = true;
return unpacked;
} finally {
if (!succ) {
unpacked.release();
}
}
} }
/** /**
@ -710,7 +718,7 @@ public class HFileBlock implements Cacheable {
buf = newBuf; buf = newBuf;
// set limit to exclude next block's header // set limit to exclude next block's header
buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); buf.limit(capacityNeeded);
} }
/** /**
@ -1686,7 +1694,7 @@ public class HFileBlock implements Cacheable {
} }
private ByteBuff allocate(int size, boolean intoHeap) { private ByteBuff allocate(int size, boolean intoHeap) {
return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size); return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
} }
/** /**
@ -1736,7 +1744,7 @@ public class HFileBlock implements Cacheable {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException()); LOG.trace("Extra see to get block size!", new RuntimeException());
} }
headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize)); headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread); readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind(); headerBuf.rewind();
} }
@ -1779,7 +1787,7 @@ public class HFileBlock implements Cacheable {
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it. // contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE, HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
offset, nextBlockOnDiskSize, fileContext, allocator); offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator);
// Run check on uncompressed sizings. // Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) { if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed(); hFileBlock.sanityCheckUncompressed();

View File

@ -313,10 +313,13 @@ public class HFileBlockIndex {
int index = -1; int index = -1;
HFileBlock block = null; HFileBlock block = null;
boolean dataBlock = false;
KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue(); KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
while (true) { while (true) {
try { try {
// Must initialize it with null here, because if don't and once an exception happen in
// readBlock, then we'll release the previous assigned block twice in the finally block.
// (See HBASE-22422)
block = null;
if (currentBlock != null && currentBlock.getOffset() == currentOffset) { if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
// Avoid reading the same block again, even with caching turned off. // Avoid reading the same block again, even with caching turned off.
// This is crucial for compaction-type workload which might have // This is crucial for compaction-type workload which might have
@ -336,9 +339,8 @@ public class HFileBlockIndex {
// this also accounts for ENCODED_DATA // this also accounts for ENCODED_DATA
expectedBlockType = BlockType.DATA; expectedBlockType = BlockType.DATA;
} }
block = block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread, pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
} }
if (block == null) { if (block == null) {
@ -348,7 +350,6 @@ public class HFileBlockIndex {
// Found a data block, break the loop and check our level in the tree. // Found a data block, break the loop and check our level in the tree.
if (block.getBlockType().isData()) { if (block.getBlockType().isData()) {
dataBlock = true;
break; break;
} }
@ -381,7 +382,7 @@ public class HFileBlockIndex {
nextIndexedKey = tmpNextIndexKV; nextIndexedKey = tmpNextIndexKV;
} }
} finally { } finally {
if (!dataBlock && block != null) { if (block != null && !block.getBlockType().isData()) {
// Release the block immediately if it is not the data block // Release the block immediately if it is not the data block
block.release(); block.release();
} }
@ -389,7 +390,7 @@ public class HFileBlockIndex {
} }
if (lookupLevel != searchTreeLevel) { if (lookupLevel != searchTreeLevel) {
assert dataBlock == true; assert block.getBlockType().isData();
// Though we have retrieved a data block we have found an issue // Though we have retrieved a data block we have found an issue
// in the retrieved data block. Hence returned the block so that // in the retrieved data block. Hence returned the block so that
// the ref count can be decremented // the ref count can be decremented
@ -401,8 +402,7 @@ public class HFileBlockIndex {
} }
// set the next indexed key for the current block. // set the next indexed key for the current block.
BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey); return new BlockWithScanInfo(block, nextIndexedKey);
return blockWithScanInfo;
} }
@Override @Override
@ -576,8 +576,7 @@ public class HFileBlockIndex {
boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding) boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
throws IOException { throws IOException {
BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock, BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
cacheBlocks, cacheBlocks, pread, isCompaction, expectedDataBlockEncoding);
pread, isCompaction, expectedDataBlockEncoding);
if (blockWithScanInfo == null) { if (blockWithScanInfo == null) {
return null; return null;
} else { } else {
@ -600,9 +599,8 @@ public class HFileBlockIndex {
* @throws IOException * @throws IOException
*/ */
public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock, public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
boolean cacheBlocks, boolean cacheBlocks, boolean pread, boolean isCompaction,
boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding) DataBlockEncoding expectedDataBlockEncoding) throws IOException;
throws IOException;
/** /**
* An approximation to the {@link HFile}'s mid-key. Operates on block * An approximation to the {@link HFile}'s mid-key. Operates on block

View File

@ -1134,15 +1134,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
updateCurrentBlock(newBlock); updateCurrentBlock(newBlock);
} }
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind,
boolean rewind, Cell key, boolean seekBefore) throws IOException { Cell key, boolean seekBefore) throws IOException {
if (this.curBlock == null if (this.curBlock == null || this.curBlock.getOffset() != seekToBlock.getOffset()) {
|| this.curBlock.getOffset() != seekToBlock.getOffset()) {
updateCurrentBlock(seekToBlock); updateCurrentBlock(seekToBlock);
} else if (rewind) { } else if (rewind) {
blockBuffer.rewind(); blockBuffer.rewind();
} }
// Update the nextIndexedKey // Update the nextIndexedKey
this.nextIndexedKey = nextIndexedKey; this.nextIndexedKey = nextIndexedKey;
return blockSeek(key, seekBefore); return blockSeek(key, seekBefore);
@ -1480,10 +1478,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Validate encoding type for data blocks. We include encoding // Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit. // type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
// Remember to release the block when in exceptional path.
cachedBlock.release();
throw new IOException("Cached block under key " + cacheKey + " " throw new IOException("Cached block under key " + cacheKey + " "
+ "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+ dataBlockEncoder.getDataBlockEncoding() + ")" + dataBlockEncoder.getDataBlockEncoding() + "), path=" + path);
+ ", path=" + path);
} }
} }
// Cache-hit. Return! // Cache-hit. Return!
@ -1507,15 +1506,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
// Cache the block if necessary // Cache the block if necessary
AtomicBoolean cachedRaw = new AtomicBoolean(false);
cacheConf.getBlockCache().ifPresent(cache -> { cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cachedRaw.set(cacheConf.shouldCacheCompressed(category)); cache.cacheBlock(cacheKey,
cache.cacheBlock(cacheKey, cachedRaw.get() ? hfileBlock : unpacked, cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
cacheConf.isInMemory()); cacheConf.isInMemory());
} }
}); });
if (unpacked != hfileBlock && !cachedRaw.get()) { if (unpacked != hfileBlock) {
// End of life here if hfileBlock is an independent block. // End of life here if hfileBlock is an independent block.
hfileBlock.release(); hfileBlock.release();
} }

View File

@ -512,7 +512,14 @@ public class LruBlockCache implements FirstLevelBlockCache {
@Override @Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) { boolean updateCacheMetrics) {
LruCachedBlock cb = map.get(cacheKey); LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
// It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
// this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
// the block and release, then we're retaining a block with refCnt=0 which is disallowed.
// see HBASE-22422.
val.getBuffer().retain();
return val;
});
if (cb == null) { if (cb == null) {
if (!repeat && updateCacheMetrics) { if (!repeat && updateCacheMetrics) {
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
@ -540,10 +547,10 @@ public class LruBlockCache implements FirstLevelBlockCache {
} }
return null; return null;
} }
if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); if (updateCacheMetrics) {
stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
cb.access(count.incrementAndGet()); cb.access(count.incrementAndGet());
// It will be referenced by RPC path, so increase here.
cb.getBuffer().retain();
return cb.getBuffer(); return cb.getBuffer();
} }
@ -601,8 +608,6 @@ public class LruBlockCache implements FirstLevelBlockCache {
if (previous == null) { if (previous == null) {
return 0; return 0;
} }
// Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate.
previous.getBuffer().release();
updateSizeMetrics(block, true); updateSizeMetrics(block, true);
long val = elements.decrementAndGet(); long val = elements.decrementAndGet();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -610,7 +615,7 @@ public class LruBlockCache implements FirstLevelBlockCache {
assertCounterSanity(size, val); assertCounterSanity(size, val);
} }
if (block.getBuffer().getBlockType().isData()) { if (block.getBuffer().getBlockType().isData()) {
dataBlockElements.decrement(); dataBlockElements.decrement();
} }
if (evictedByEvictionProcess) { if (evictedByEvictionProcess) {
// When the eviction of the block happened because of invalidation of HFiles, no need to // When the eviction of the block happened because of invalidation of HFiles, no need to
@ -620,6 +625,10 @@ public class LruBlockCache implements FirstLevelBlockCache {
victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
} }
} }
// Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
// NOT move this up because if do that then the victimHandler may access the buffer with
// refCnt = 0 which is disallowed.
previous.getBuffer().release();
return block.heapSize(); return block.heapSize();
} }

View File

@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -1530,21 +1531,28 @@ public class BucketCache implements BlockCache, HeapSize {
} }
public RAMQueueEntry get(BlockCacheKey key) { public RAMQueueEntry get(BlockCacheKey key) {
RAMQueueEntry re = delegate.get(key); return delegate.computeIfPresent(key, (k, re) -> {
if (re != null) { // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
// It'll be referenced by RPC, so retain here. // atomic, another thread may remove and release the block, when retaining in this thread we
// may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
re.getData().retain(); re.getData().retain();
} return re;
return re; });
} }
/**
* Return the previous associated value, or null if absent. It has the same meaning as
* {@link ConcurrentMap#putIfAbsent(Object, Object)}
*/
public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
RAMQueueEntry previous = delegate.putIfAbsent(key, entry); AtomicBoolean absent = new AtomicBoolean(false);
if (previous == null) { RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
// The RAMCache reference to this entry, so reference count should be increment. // The RAMCache reference to this entry, so reference count should be increment.
entry.getData().retain(); entry.getData().retain();
} absent.set(true);
return previous; return entry;
});
return absent.get() ? null : re;
} }
public boolean remove(BlockCacheKey key) { public boolean remove(BlockCacheKey key) {
@ -1573,8 +1581,9 @@ public class BucketCache implements BlockCache, HeapSize {
public void clear() { public void clear() {
Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
while (it.hasNext()) { while (it.hasNext()) {
it.next().getValue().getData().release(); RAMQueueEntry re = it.next().getValue();
it.remove(); it.remove();
re.getData().release();
} }
} }
} }

View File

@ -17,11 +17,16 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache.CombinedCacheStats; import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache.CombinedCacheStats;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -33,6 +38,8 @@ public class TestCombinedBlockCache {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCombinedBlockCache.class); HBaseClassTestRule.forClass(TestCombinedBlockCache.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@Test @Test
public void testCombinedCacheStats() { public void testCombinedCacheStats() {
CacheStats lruCacheStats = new CacheStats("lruCacheStats", 2); CacheStats lruCacheStats = new CacheStats("lruCacheStats", 2);
@ -102,4 +109,14 @@ public class TestCombinedBlockCache {
assertEquals(0.75, stats.getHitRatioPastNPeriods(), delta); assertEquals(0.75, stats.getHitRatioPastNPeriods(), delta);
assertEquals(0.8, stats.getHitCachingRatioPastNPeriods(), delta); assertEquals(0.8, stats.getHitCachingRatioPastNPeriods(), delta);
} }
@Test
public void testMultiThreadGetAndEvictBlock() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
BlockCache blockCache = BlockCacheFactory.createBlockCache(conf);
Assert.assertTrue(blockCache instanceof CombinedBlockCache);
TestLruBlockCache.testMultiThreadGetAndEvictBlockInternal(blockCache);
}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -27,6 +28,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -34,15 +36,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.Assert;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Tests the concurrent LruBlockCache.<p> * Tests the concurrent LruBlockCache.<p>
@ -58,6 +62,8 @@ public class TestLruBlockCache {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLruBlockCache.class); HBaseClassTestRule.forClass(TestLruBlockCache.class);
private static final Logger LOG = LoggerFactory.getLogger(TestLruBlockCache.class);
@Test @Test
public void testCacheEvictionThreadSafe() throws Exception { public void testCacheEvictionThreadSafe() throws Exception {
long maxSize = 100000; long maxSize = 100000;
@ -814,11 +820,10 @@ public class TestLruBlockCache {
byte[] byteArr = new byte[length]; byte[] byteArr = new byte[length];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build(); HFileContext meta = new HFileContextBuilder().build();
ByteBuffAllocator alloc = ByteBuffAllocator.HEAP;
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc); HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc); HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
(int)Math.ceil(1.2*maxSize/blockSize), (int)Math.ceil(1.2*maxSize/blockSize),
@ -958,5 +963,75 @@ public class TestLruBlockCache {
} }
static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
HFileContext meta = new HFileContextBuilder().build();
BlockCacheKey key = new BlockCacheKey("key1", 0);
HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
AtomicBoolean err1 = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000 && !err1.get(); i++) {
try {
cache.getBlock(key, false, false, true);
} catch (Exception e) {
err1.set(true);
LOG.info("Cache block or get block failure: ", e);
}
}
});
AtomicBoolean err2 = new AtomicBoolean(false);
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000 && !err2.get(); i++) {
try {
cache.evictBlock(key);
} catch (Exception e) {
err2.set(true);
LOG.info("Evict block failure: ", e);
}
}
});
AtomicBoolean err3 = new AtomicBoolean(false);
Thread t3 = new Thread(() -> {
for (int i = 0; i < 10000 && !err3.get(); i++) {
try {
cache.cacheBlock(key, blk);
} catch (Exception e) {
err3.set(true);
LOG.info("Cache block failure: ", e);
}
}
});
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
Assert.assertFalse(err1.get());
Assert.assertFalse(err2.get());
Assert.assertFalse(err3.get());
}
@Test
public void testMultiThreadGetAndEvictBlock() throws Exception {
long maxSize = 100000;
long blockSize = calculateBlockSize(maxSize, 10);
LruBlockCache cache =
new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL,
0.66f, // min
0.99f, // acceptable
0.33f, // single
0.33f, // multi
0.34f, // memory
1.2f, // limit
false, 1024);
testMultiThreadGetAndEvictBlockInternal(cache);
}
} }

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ IOTests.class, MediumTests.class })
public class TestRAMCache {
private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRAMCache.class);
// Define a mock HFileBlock.
private static class MockHFileBlock extends HFileBlock {
private volatile CountDownLatch latch;
MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext, ByteBuffAllocator allocator) {
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
allocator);
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public MockHFileBlock retain() {
try {
if (latch != null) {
latch.await();
}
} catch (InterruptedException e) {
LOG.info("Interrupted exception error: ", e);
}
super.retain();
return this;
}
}
@Test
public void testAtomicRAMCache() throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
RAMCache cache = new RAMCache();
BlockCacheKey key = new BlockCacheKey("file-1", 1);
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);
CountDownLatch latch = new CountDownLatch(1);
blk.setLatch(latch);
AtomicBoolean error = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
try {
cache.get(key);
} catch (Exception e) {
error.set(true);
}
});
t1.start();
Thread.sleep(200);
AtomicBoolean removed = new AtomicBoolean(false);
Thread t2 = new Thread(() -> {
cache.remove(key);
removed.set(true);
});
t2.start();
Thread.sleep(200);
Assert.assertFalse(removed.get());
latch.countDown();
Thread.sleep(200);
Assert.assertTrue(removed.get());
Assert.assertFalse(error.get());
}
}