HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)

This commit is contained in:
openinx 2019-05-28 10:10:34 +08:00 committed by huzheng
parent b1fee0ebdd
commit 962554d340
9 changed files with 306 additions and 63 deletions

View File

@ -230,6 +230,9 @@ public class BlockCacheUtil {
BlockCacheKey cacheKey, Cacheable newBlock) {
// NOTICE: The getBlock has retained the existingBlock inside.
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
if (existingBlock == null) {
return true;
}
try {
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
if (comparison < 0) {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@ -677,18 +679,24 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = new HFileBlock(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
dup = dup.slice();
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
return unpacked;
boolean succ = false;
try {
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
// Create a duplicated buffer without the header part.
ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize());
dup = dup.slice();
// Decode the dup into unpacked#buf
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
succ = true;
return unpacked;
} finally {
if (!succ) {
unpacked.release();
}
}
}
/**
@ -709,7 +717,7 @@ public class HFileBlock implements Cacheable {
buf = newBuf;
// set limit to exclude next block's header
buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
buf.limit(capacityNeeded);
}
/**
@ -1685,7 +1693,7 @@ public class HFileBlock implements Cacheable {
}
private ByteBuff allocate(int size, boolean intoHeap) {
return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size);
return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
}
/**
@ -1735,7 +1743,7 @@ public class HFileBlock implements Cacheable {
if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException());
}
headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize));
headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind();
}
@ -1778,7 +1786,7 @@ public class HFileBlock implements Cacheable {
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
offset, nextBlockOnDiskSize, fileContext, allocator);
offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator);
// Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed();

View File

@ -313,10 +313,13 @@ public class HFileBlockIndex {
int index = -1;
HFileBlock block = null;
boolean dataBlock = false;
KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
while (true) {
try {
// Must initialize it with null here, because if don't and once an exception happen in
// readBlock, then we'll release the previous assigned block twice in the finally block.
// (See HBASE-22422)
block = null;
if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
// Avoid reading the same block again, even with caching turned off.
// This is crucial for compaction-type workload which might have
@ -336,9 +339,8 @@ public class HFileBlockIndex {
// this also accounts for ENCODED_DATA
expectedBlockType = BlockType.DATA;
}
block =
cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
}
if (block == null) {
@ -348,7 +350,6 @@ public class HFileBlockIndex {
// Found a data block, break the loop and check our level in the tree.
if (block.getBlockType().isData()) {
dataBlock = true;
break;
}
@ -381,7 +382,7 @@ public class HFileBlockIndex {
nextIndexedKey = tmpNextIndexKV;
}
} finally {
if (!dataBlock && block != null) {
if (block != null && !block.getBlockType().isData()) {
// Release the block immediately if it is not the data block
block.release();
}
@ -389,7 +390,7 @@ public class HFileBlockIndex {
}
if (lookupLevel != searchTreeLevel) {
assert dataBlock == true;
assert block.getBlockType().isData();
// Though we have retrieved a data block we have found an issue
// in the retrieved data block. Hence returned the block so that
// the ref count can be decremented
@ -401,8 +402,7 @@ public class HFileBlockIndex {
}
// set the next indexed key for the current block.
BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
return blockWithScanInfo;
return new BlockWithScanInfo(block, nextIndexedKey);
}
@Override
@ -576,8 +576,7 @@ public class HFileBlockIndex {
boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
throws IOException {
BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
cacheBlocks,
pread, isCompaction, expectedDataBlockEncoding);
cacheBlocks, pread, isCompaction, expectedDataBlockEncoding);
if (blockWithScanInfo == null) {
return null;
} else {
@ -600,9 +599,8 @@ public class HFileBlockIndex {
* @throws IOException
*/
public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
boolean cacheBlocks,
boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
throws IOException;
boolean cacheBlocks, boolean pread, boolean isCompaction,
DataBlockEncoding expectedDataBlockEncoding) throws IOException;
/**
* An approximation to the {@link HFile}'s mid-key. Operates on block

View File

@ -1134,15 +1134,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
updateCurrentBlock(newBlock);
}
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
boolean rewind, Cell key, boolean seekBefore) throws IOException {
if (this.curBlock == null
|| this.curBlock.getOffset() != seekToBlock.getOffset()) {
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind,
Cell key, boolean seekBefore) throws IOException {
if (this.curBlock == null || this.curBlock.getOffset() != seekToBlock.getOffset()) {
updateCurrentBlock(seekToBlock);
} else if (rewind) {
blockBuffer.rewind();
}
// Update the nextIndexedKey
this.nextIndexedKey = nextIndexedKey;
return blockSeek(key, seekBefore);
@ -1480,10 +1478,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
// Remember to release the block when in exceptional path.
cachedBlock.release();
throw new IOException("Cached block under key " + cacheKey + " "
+ "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+ dataBlockEncoder.getDataBlockEncoding() + ")"
+ ", path=" + path);
+ "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+ dataBlockEncoder.getDataBlockEncoding() + "), path=" + path);
}
}
// Cache-hit. Return!
@ -1507,15 +1506,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
// Cache the block if necessary
AtomicBoolean cachedRaw = new AtomicBoolean(false);
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cachedRaw.set(cacheConf.shouldCacheCompressed(category));
cache.cacheBlock(cacheKey, cachedRaw.get() ? hfileBlock : unpacked,
cache.cacheBlock(cacheKey,
cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
cacheConf.isInMemory());
}
});
if (unpacked != hfileBlock && !cachedRaw.get()) {
if (unpacked != hfileBlock) {
// End of life here if hfileBlock is an independent block.
hfileBlock.release();
}

View File

@ -512,7 +512,14 @@ public class LruBlockCache implements FirstLevelBlockCache {
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
LruCachedBlock cb = map.get(cacheKey);
LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
// It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
// this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
// the block and release, then we're retaining a block with refCnt=0 which is disallowed.
// see HBASE-22422.
val.getBuffer().retain();
return val;
});
if (cb == null) {
if (!repeat && updateCacheMetrics) {
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
@ -540,10 +547,10 @@ public class LruBlockCache implements FirstLevelBlockCache {
}
return null;
}
if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
if (updateCacheMetrics) {
stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
cb.access(count.incrementAndGet());
// It will be referenced by RPC path, so increase here.
cb.getBuffer().retain();
return cb.getBuffer();
}
@ -601,8 +608,6 @@ public class LruBlockCache implements FirstLevelBlockCache {
if (previous == null) {
return 0;
}
// Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate.
previous.getBuffer().release();
updateSizeMetrics(block, true);
long val = elements.decrementAndGet();
if (LOG.isTraceEnabled()) {
@ -610,7 +615,7 @@ public class LruBlockCache implements FirstLevelBlockCache {
assertCounterSanity(size, val);
}
if (block.getBuffer().getBlockType().isData()) {
dataBlockElements.decrement();
dataBlockElements.decrement();
}
if (evictedByEvictionProcess) {
// When the eviction of the block happened because of invalidation of HFiles, no need to
@ -620,6 +625,10 @@ public class LruBlockCache implements FirstLevelBlockCache {
victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
}
}
// Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
// NOT move this up because if do that then the victimHandler may access the buffer with
// refCnt = 0 which is disallowed.
previous.getBuffer().release();
return block.heapSize();
}

View File

@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
@ -1532,21 +1533,28 @@ public class BucketCache implements BlockCache, HeapSize {
}
public RAMQueueEntry get(BlockCacheKey key) {
RAMQueueEntry re = delegate.get(key);
if (re != null) {
// It'll be referenced by RPC, so retain here.
return delegate.computeIfPresent(key, (k, re) -> {
// It'll be referenced by RPC, so retain atomically here. if the get and retain is not
// atomic, another thread may remove and release the block, when retaining in this thread we
// may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
re.getData().retain();
}
return re;
return re;
});
}
/**
* Return the previous associated value, or null if absent. It has the same meaning as
* {@link ConcurrentMap#putIfAbsent(Object, Object)}
*/
public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
RAMQueueEntry previous = delegate.putIfAbsent(key, entry);
if (previous == null) {
AtomicBoolean absent = new AtomicBoolean(false);
RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
// The RAMCache reference to this entry, so reference count should be increment.
entry.getData().retain();
}
return previous;
absent.set(true);
return entry;
});
return absent.get() ? null : re;
}
public boolean remove(BlockCacheKey key) {
@ -1575,8 +1583,9 @@ public class BucketCache implements BlockCache, HeapSize {
public void clear() {
Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
while (it.hasNext()) {
it.next().getValue().getData().release();
RAMQueueEntry re = it.next().getValue();
it.remove();
re.getData().release();
}
}
}

View File

@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache.CombinedCacheStats;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -33,6 +38,8 @@ public class TestCombinedBlockCache {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCombinedBlockCache.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@Test
public void testCombinedCacheStats() {
CacheStats lruCacheStats = new CacheStats("lruCacheStats", 2);
@ -102,4 +109,14 @@ public class TestCombinedBlockCache {
assertEquals(0.75, stats.getHitRatioPastNPeriods(), delta);
assertEquals(0.8, stats.getHitCachingRatioPastNPeriods(), delta);
}
@Test
public void testMultiThreadGetAndEvictBlock() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
BlockCache blockCache = BlockCacheFactory.createBlockCache(conf);
Assert.assertTrue(blockCache instanceof CombinedBlockCache);
TestLruBlockCache.testMultiThreadGetAndEvictBlockInternal(blockCache);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -27,6 +28,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -34,15 +36,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the concurrent LruBlockCache.<p>
@ -58,6 +62,8 @@ public class TestLruBlockCache {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLruBlockCache.class);
private static final Logger LOG = LoggerFactory.getLogger(TestLruBlockCache.class);
@Test
public void testCacheEvictionThreadSafe() throws Exception {
long maxSize = 100000;
@ -814,11 +820,10 @@ public class TestLruBlockCache {
byte[] byteArr = new byte[length];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build();
ByteBuffAllocator alloc = ByteBuffAllocator.HEAP;
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
(int)Math.ceil(1.2*maxSize/blockSize),
@ -958,5 +963,75 @@ public class TestLruBlockCache {
}
static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
HFileContext meta = new HFileContextBuilder().build();
BlockCacheKey key = new BlockCacheKey("key1", 0);
HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
AtomicBoolean err1 = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000 && !err1.get(); i++) {
try {
cache.getBlock(key, false, false, true);
} catch (Exception e) {
err1.set(true);
LOG.info("Cache block or get block failure: ", e);
}
}
});
AtomicBoolean err2 = new AtomicBoolean(false);
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000 && !err2.get(); i++) {
try {
cache.evictBlock(key);
} catch (Exception e) {
err2.set(true);
LOG.info("Evict block failure: ", e);
}
}
});
AtomicBoolean err3 = new AtomicBoolean(false);
Thread t3 = new Thread(() -> {
for (int i = 0; i < 10000 && !err3.get(); i++) {
try {
cache.cacheBlock(key, blk);
} catch (Exception e) {
err3.set(true);
LOG.info("Cache block failure: ", e);
}
}
});
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
Assert.assertFalse(err1.get());
Assert.assertFalse(err2.get());
Assert.assertFalse(err3.get());
}
@Test
public void testMultiThreadGetAndEvictBlock() throws Exception {
long maxSize = 100000;
long blockSize = calculateBlockSize(maxSize, 10);
LruBlockCache cache =
new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL,
0.66f, // min
0.99f, // acceptable
0.33f, // single
0.33f, // multi
0.34f, // memory
1.2f, // limit
false, 1024);
testMultiThreadGetAndEvictBlockInternal(cache);
}
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ IOTests.class, MediumTests.class })
public class TestRAMCache {
private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRAMCache.class);
// Define a mock HFileBlock.
private static class MockHFileBlock extends HFileBlock {
private volatile CountDownLatch latch;
MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext, ByteBuffAllocator allocator) {
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
allocator);
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public MockHFileBlock retain() {
try {
if (latch != null) {
latch.await();
}
} catch (InterruptedException e) {
LOG.info("Interrupted exception error: ", e);
}
super.retain();
return this;
}
}
@Test
public void testAtomicRAMCache() throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
RAMCache cache = new RAMCache();
BlockCacheKey key = new BlockCacheKey("file-1", 1);
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);
CountDownLatch latch = new CountDownLatch(1);
blk.setLatch(latch);
AtomicBoolean error = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
try {
cache.get(key);
} catch (Exception e) {
error.set(true);
}
});
t1.start();
Thread.sleep(200);
AtomicBoolean removed = new AtomicBoolean(false);
Thread t2 = new Thread(() -> {
cache.remove(key);
removed.set(true);
});
t2.start();
Thread.sleep(200);
Assert.assertFalse(removed.get());
latch.countDown();
Thread.sleep(200);
Assert.assertTrue(removed.get());
Assert.assertFalse(error.get());
}
}