HBASE-22463 Some paths in HFileScannerImpl did not consider block#release which will exhaust the ByteBuffAllocator (#257)

This commit is contained in:
openinx 2019-05-30 12:24:10 +08:00 committed by huzheng
parent 962554d340
commit 5b363a6140
21 changed files with 403 additions and 192 deletions

View File

@ -37,7 +37,6 @@ import net.spy.memcached.transcoders.Transcoder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
@ -272,8 +271,7 @@ public class MemcachedBlockCache implements BlockCache {
public HFileBlock decode(CachedData d) { public HFileBlock decode(CachedData d) {
try { try {
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP, return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP);
MemoryType.EXCLUSIVE);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to deserialize data from memcached", e); LOG.warn("Failed to deserialize data from memcached", e);
} }

View File

@ -63,20 +63,6 @@ public interface Cacheable extends HeapSize, HBaseReferenceCounted {
*/ */
BlockType getBlockType(); BlockType getBlockType();
/**
* @return the {@code MemoryType} of this Cacheable
*/
MemoryType getMemoryType();
/**
* SHARED means when this Cacheable is read back from cache it refers to the same memory area as
* used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
* the data was copied to an exclusive memory area of this Cacheable.
*/
enum MemoryType {
SHARED, EXCLUSIVE
}
/******************************* ReferenceCounted Interfaces ***********************************/ /******************************* ReferenceCounted Interfaces ***********************************/
/** /**

View File

@ -20,9 +20,8 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Interface for a deserializer. Throws an IOException if the serialized data is incomplete or * Interface for a deserializer. Throws an IOException if the serialized data is incomplete or
@ -33,11 +32,10 @@ public interface CacheableDeserializer<T extends Cacheable> {
/** /**
* @param b ByteBuff to deserialize the Cacheable. * @param b ByteBuff to deserialize the Cacheable.
* @param allocator to manage NIO ByteBuffers for future allocation or de-allocation. * @param allocator to manage NIO ByteBuffers for future allocation or de-allocation.
* @param memType the {@link MemoryType} of the buffer
* @return T the deserialized object. * @return T the deserialized object.
* @throws IOException * @throws IOException
*/ */
T deserialize(ByteBuff b, ByteBuffAllocator allocator, MemoryType memType) throws IOException; T deserialize(ByteBuff b, ByteBuffAllocator allocator) throws IOException;
/** /**
* Get the identifier of this deserializer. Identifier is unique for each deserializer and * Get the identifier of this deserializer. Identifier is unique for each deserializer and

View File

@ -201,8 +201,6 @@ public class HFileBlock implements Cacheable {
*/ */
private long offset = UNSET; private long offset = UNSET;
private MemoryType memType = MemoryType.EXCLUSIVE;
/** /**
* The on-disk size of the next block, including the header and checksums if present. * The on-disk size of the next block, including the header and checksums if present.
* UNSET if unknown. * UNSET if unknown.
@ -274,7 +272,7 @@ public class HFileBlock implements Cacheable {
} }
@Override @Override
public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc, MemoryType memType) public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
throws IOException { throws IOException {
// The buf has the file block followed by block metadata. // The buf has the file block followed by block metadata.
// Set limit to just before the BLOCK_METADATA_SPACE then rewind. // Set limit to just before the BLOCK_METADATA_SPACE then rewind.
@ -287,8 +285,7 @@ public class HFileBlock implements Cacheable {
boolean usesChecksum = buf.get() == (byte) 1; boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong(); long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt(); int nextBlockOnDiskSize = buf.getInt();
return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null, return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
alloc);
} }
@Override @Override
@ -366,7 +363,7 @@ public class HFileBlock implements Cacheable {
* to that point. * to that point.
* @param buf Has header, content, and trailing checksums if present. * @param buf Has header, content, and trailing checksums if present.
*/ */
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset, HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator) final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
throws IOException { throws IOException {
buf.rewind(); buf.rewind();
@ -398,7 +395,6 @@ public class HFileBlock implements Cacheable {
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset, init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator); onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
this.memType = memType;
this.offset = offset; this.offset = offset;
this.buf = buf; this.buf = buf;
this.buf.rewind(); this.buf.rewind();
@ -1785,8 +1781,8 @@ public class HFileBlock implements Cacheable {
// The onDiskBlock will become the headerAndDataBuffer for this block. // The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it. // contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE, HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator); nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
// Run check on uncompressed sizings. // Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) { if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed(); hFileBlock.sanityCheckUncompressed();
@ -2060,18 +2056,6 @@ public class HFileBlock implements Cacheable {
return this.fileContext; return this.fileContext;
} }
@Override
public MemoryType getMemoryType() {
return this.memType;
}
/**
* @return true if this block is backed by a shared memory area(such as that of a BucketCache).
*/
boolean usesSharedMemory() {
return this.memType == MemoryType.SHARED;
}
/** /**
* Convert the contents of the block header into a human readable string. * Convert the contents of the block header into a human readable string.
* This is mostly helpful for debugging. This assumes that the block * This is mostly helpful for debugging. This assumes that the block

View File

@ -24,8 +24,6 @@ import java.security.Key;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -489,8 +487,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
private int currValueLen; private int currValueLen;
private int currMemstoreTSLen; private int currMemstoreTSLen;
private long currMemstoreTS; private long currMemstoreTS;
// Updated but never read?
protected AtomicInteger blockFetches = new AtomicInteger(0);
protected final HFile.Reader reader; protected final HFile.Reader reader;
private int currTagsLen; private int currTagsLen;
// buffer backed keyonlyKV // buffer backed keyonlyKV
@ -506,7 +502,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet. * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
*/ */
protected Cell nextIndexedKey; protected Cell nextIndexedKey;
// Current block being used // Current block being used. NOTICE: DON't release curBlock separately except in shipped() or
// close() methods. Because the shipped() or close() will do the release finally, even if any
// exception occur the curBlock will be released by the close() method (see
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock; protected HFileBlock curBlock;
// Previous blocks that were used in the course of the read // Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>(); protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();
@ -520,12 +520,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
} }
void updateCurrBlockRef(HFileBlock block) { void updateCurrBlockRef(HFileBlock block) {
if (block != null && this.curBlock != null && if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) {
block.getOffset() == this.curBlock.getOffset()) {
return; return;
} }
// We don't have to keep ref to EXCLUSIVE type of block if (this.curBlock != null) {
if (this.curBlock != null && this.curBlock.usesSharedMemory()) {
prevBlocks.add(this.curBlock); prevBlocks.add(this.curBlock);
} }
this.curBlock = block; this.curBlock = block;
@ -533,7 +531,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
void reset() { void reset() {
// We don't have to keep ref to EXCLUSIVE type of block // We don't have to keep ref to EXCLUSIVE type of block
if (this.curBlock != null && this.curBlock.usesSharedMemory()) { if (this.curBlock != null) {
this.prevBlocks.add(this.curBlock); this.prevBlocks.add(this.curBlock);
} }
this.curBlock = null; this.curBlock = null;
@ -821,7 +819,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key, return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key,
false); false);
} }
} }
} }
// Don't rewind on a reseek operation, because reseek implies that we are // Don't rewind on a reseek operation, because reseek implies that we are
@ -846,20 +843,19 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
public int seekTo(Cell key, boolean rewind) throws IOException { public int seekTo(Cell key, boolean rewind) throws IOException {
HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock, BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock,
cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
// This happens if the key e.g. falls before the beginning of the // This happens if the key e.g. falls before the beginning of the file.
// file.
return -1; return -1;
} }
return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
blockWithScanInfo.getNextIndexedKey(), rewind, key, false); blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
} }
@Override @Override
public boolean seekBefore(Cell key) throws IOException { public boolean seekBefore(Cell key) throws IOException {
HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock, HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock,
cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction)); cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction));
if (seekToBlock == null) { if (seekToBlock == null) {
return false; return false;
} }
@ -869,22 +865,22 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// The key we are interested in // The key we are interested in
if (previousBlockOffset == -1) { if (previousBlockOffset == -1) {
// we have a 'problem', the key we want is the first of the file. // we have a 'problem', the key we want is the first of the file.
releaseIfNotCurBlock(seekToBlock);
return false; return false;
} }
// The first key in the current block 'seekToBlock' is greater than the given // The first key in the current block 'seekToBlock' is greater than the given
// seekBefore key. We will go ahead by reading the next block that satisfies the // seekBefore key. We will go ahead by reading the next block that satisfies the
// given key. Return the current block before reading the next one. // given key. Return the current block before reading the next one.
seekToBlock.release(); releaseIfNotCurBlock(seekToBlock);
// It is important that we compute and pass onDiskSize to the block // It is important that we compute and pass onDiskSize to the block
// reader so that it does not have to read the header separately to // reader so that it does not have to read the header separately to
// figure out the size. Currently, we do not have a way to do this // figure out the size. Currently, we do not have a way to do this
// correctly in the general case however. // correctly in the general case however.
// TODO: See https://issues.apache.org/jira/browse/HBASE-14576 // TODO: See https://issues.apache.org/jira/browse/HBASE-14576
int prevBlockSize = -1; int prevBlockSize = -1;
seekToBlock = reader.readBlock(previousBlockOffset, seekToBlock = reader.readBlock(previousBlockOffset, prevBlockSize, cacheBlocks, pread,
prevBlockSize, cacheBlocks, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
// TODO shortcut: seek forward in this block to the last key of the // TODO shortcut: seek forward in this block to the last key of the
// block. // block.
} }
@ -892,6 +888,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return true; return true;
} }
/**
* The curBlock will be released by shipping or close method, so only need to consider releasing
* the block, which was read from HFile before and not referenced by curBlock.
*/
protected void releaseIfNotCurBlock(HFileBlock block) {
if (curBlock != block) {
block.release();
}
}
/** /**
* Scans blocks in the "scanned" section of the {@link HFile} until the next * Scans blocks in the "scanned" section of the {@link HFile} until the next
* data block is found. * data block is found.
@ -903,33 +909,30 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
justification="Yeah, unnecessary null check; could do w/ clean up") justification="Yeah, unnecessary null check; could do w/ clean up")
protected HFileBlock readNextDataBlock() throws IOException { protected HFileBlock readNextDataBlock() throws IOException {
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
if (curBlock == null) if (curBlock == null) {
return null; return null;
}
HFileBlock block = this.curBlock; HFileBlock block = this.curBlock;
do { do {
if (block.getOffset() >= lastDataBlockOffset) { if (block.getOffset() >= lastDataBlockOffset) {
releaseIfNotCurBlock(block);
return null; return null;
} }
if (block.getOffset() < 0) { if (block.getOffset() < 0) {
throw new IOException( releaseIfNotCurBlock(block);
"Invalid block file offset: " + block + ", path=" + reader.getPath()); throw new IOException("Invalid block file offset: " + block + ", path=" + reader.getPath());
} }
// We are reading the next block without block type validation, because // We are reading the next block without block type validation, because
// it might turn out to be a non-data block. // it might turn out to be a non-data block.
block = reader.readBlock(block.getOffset() + block.getOnDiskSizeWithHeader(), block = reader.readBlock(block.getOffset() + block.getOnDiskSizeWithHeader(),
block.getNextBlockOnDiskSize(), cacheBlocks, pread, block.getNextBlockOnDiskSize(), cacheBlocks, pread, isCompaction, true, null,
isCompaction, true, null, getEffectiveDataBlockEncoding()); getEffectiveDataBlockEncoding());
if (block != null && !block.getBlockType().isData()) { // Findbugs: NP_NULL_ON_SOME_PATH if (block != null && !block.getBlockType().isData()) {
// Whatever block we read we will be returning it unless // Whatever block we read we will be returning it unless
// it is a datablock. Just in case the blocks are non data blocks // it is a datablock. Just in case the blocks are non data blocks
block.release(); block.release();
} }
} while (!block.getBlockType().isData()); } while (!block.getBlockType().isData());
return block; return block;
} }
@ -1109,8 +1112,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
} }
long firstDataBlockOffset = reader.getTrailer().getFirstDataBlockOffset(); long firstDataBlockOffset = reader.getTrailer().getFirstDataBlockOffset();
if (curBlock != null if (curBlock != null && curBlock.getOffset() == firstDataBlockOffset) {
&& curBlock.getOffset() == firstDataBlockOffset) {
return processFirstDataBlock(); return processFirstDataBlock();
} }
@ -1128,8 +1130,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (newBlock.getOffset() < 0) { if (newBlock.getOffset() < 0) {
throw new IOException( releaseIfNotCurBlock(newBlock);
"Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath()); throw new IOException("Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
} }
updateCurrentBlock(newBlock); updateCurrentBlock(newBlock);
} }
@ -1176,26 +1178,26 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
} }
/** /**
* Updates the current block to be the given {@link HFileBlock}. Seeks to * Updates the current block to be the given {@link HFileBlock}. Seeks to the the first
* the the first key/value pair. * key/value pair.
* * @param newBlock the block read by {@link HFileReaderImpl#readBlock}, it's a totally new block
* @param newBlock the block to make current * with new allocated {@link ByteBuff}, so if no further reference to this block, we
* should release it carefully.
*/ */
protected void updateCurrentBlock(HFileBlock newBlock) throws IOException { protected void updateCurrentBlock(HFileBlock newBlock) throws IOException {
// Set the active block on the reader try {
// sanity check if (newBlock.getBlockType() != BlockType.DATA) {
if (newBlock.getBlockType() != BlockType.DATA) { throw new IllegalStateException(
throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got " "ScannerV2 works only on data blocks, got " + newBlock.getBlockType() + "; "
+ newBlock.getBlockType() + "; " + "HFileName=" + reader.getPath() + "HFileName=" + reader.getPath() + ", " + "dataBlockEncoder="
+ ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction=" + reader.getDataBlockEncoding() + ", " + "isCompaction=" + isCompaction);
+ isCompaction); }
updateCurrBlockRef(newBlock);
blockBuffer = newBlock.getBufferWithoutHeader();
readKeyValueLen();
} finally {
releaseIfNotCurBlock(newBlock);
} }
updateCurrBlockRef(newBlock);
blockBuffer = newBlock.getBufferWithoutHeader();
readKeyValueLen();
blockFetches.incrementAndGet();
// Reset the next indexed key // Reset the next indexed key
this.nextIndexedKey = null; this.nextIndexedKey = null;
} }
@ -1643,32 +1645,33 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
} }
/** /**
* Updates the current block to be the given {@link HFileBlock}. Seeks to * Updates the current block to be the given {@link HFileBlock}. Seeks to the the first
* the the first key/value pair. * key/value pair.
* * @param newBlock the block to make current, and read by {@link HFileReaderImpl#readBlock},
* @param newBlock the block to make current * it's a totally new block with new allocated {@link ByteBuff}, so if no further
* reference to this block, we should release it carefully.
* @throws CorruptHFileException * @throws CorruptHFileException
*/ */
@Override @Override
protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
try {
// sanity checks // sanity checks
if (newBlock.getBlockType() != BlockType.ENCODED_DATA) { if (newBlock.getBlockType() != BlockType.ENCODED_DATA) {
throw new IllegalStateException("EncodedScanner works only on encoded data blocks"); throw new IllegalStateException("EncodedScanner works only on encoded data blocks");
}
short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
String encoderCls = dataBlockEncoder.getClass().getName();
throw new CorruptHFileException(
"Encoder " + encoderCls + " doesn't support data block encoding "
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath());
}
updateCurrBlockRef(newBlock);
ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
seeker.setCurrentBuffer(encodedBuffer);
} finally {
releaseIfNotCurBlock(newBlock);
} }
short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
String encoderCls = dataBlockEncoder.getClass().getName();
throw new CorruptHFileException("Encoder " + encoderCls
+ " doesn't support data block encoding "
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId)
+ ", path=" + reader.getPath());
}
updateCurrBlockRef(newBlock);
ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
seeker.setCurrentBuffer(encodedBuffer);
blockFetches.incrementAndGet();
// Reset the next indexed key // Reset the next indexed key
this.nextIndexedKey = null; this.nextIndexedKey = null;
} }
@ -1748,8 +1751,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override @Override
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
boolean rewind, Cell key, boolean seekBefore) throws IOException { boolean rewind, Cell key, boolean seekBefore) throws IOException {
if (this.curBlock == null if (this.curBlock == null || this.curBlock.getOffset() != seekToBlock.getOffset()) {
|| this.curBlock.getOffset() != seekToBlock.getOffset()) {
updateCurrentBlock(seekToBlock); updateCurrentBlock(seekToBlock);
} else if (rewind) { } else if (rewind) {
seeker.rewind(); seeker.rewind();

View File

@ -530,16 +530,9 @@ public class LruBlockCache implements FirstLevelBlockCache {
if (victimHandler != null && !repeat) { if (victimHandler != null && !repeat) {
// The handler will increase result's refCnt for RPC, so need no extra retain. // The handler will increase result's refCnt for RPC, so need no extra retain.
Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
// Promote this to L1. // Promote this to L1.
if (result != null) { if (result != null) {
if (caching) { if (caching) {
if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
Cacheable original = result;
result = ((HFileBlock) original).deepCloneOnHeap();
// deepClone an new one, so need to release the original one to deallocate it.
original.release();
}
cacheBlock(cacheKey, result, /* inMemory = */ false); cacheBlock(cacheKey, result, /* inMemory = */ false);
} }
} }

View File

@ -171,8 +171,8 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
if (victimCache != null) { if (victimCache != null) {
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
if ((value != null) && caching) { if ((value != null) && caching) {
if ((value instanceof HFileBlock) && ((HFileBlock) value).usesSharedMemory()) { if ((value instanceof HFileBlock) && !((HFileBlock) value).isOnHeap()) {
value = ((HFileBlock) value).deepClone(); value = ((HFileBlock) value).deepCloneOnHeap();
} }
cacheBlock(cacheKey, value); cacheBlock(cacheKey, value);
} }
@ -248,17 +248,6 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
.iterator(); .iterator();
} }
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
// There is no SHARED type here in L1. But the block might have been served from the L2 victim
// cache (when the Combined mode = false). So just try return this block to the victim cache.
// Note : In case of CombinedBlockCache we will have this victim cache configured for L1
// cache. But CombinedBlockCache will only call returnBlock on L2 cache.
if (victimCache != null) {
victimCache.returnBlock(cacheKey, block);
}
}
private void logStats() { private void logStats() {
LOG.info( LOG.info(
"totalSize=" + StringUtils.byteDesc(heapSize()) + ", " + "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +

View File

@ -29,7 +29,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
@ -194,9 +193,9 @@ class BucketEntry implements HBaseReferenceCounted {
return this.refCnt() > 1 || (evicted && refCnt() == 1); return this.refCnt() > 1 || (evicted && refCnt() == 1);
} }
Cacheable wrapAsCacheable(ByteBuffer[] buffers, MemoryType memoryType) throws IOException { Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt); ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
return this.deserializerReference().deserialize(buf, allocator, memoryType); return this.deserializerReference().deserialize(buf, allocator);
} }
interface BucketEntryHandler<T> { interface BucketEntryHandler<T> {

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator; import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray; import org.apache.hadoop.hbase.util.ByteBufferArray;
@ -104,10 +103,9 @@ public class ByteBufferIOEngine implements IOEngine {
// Here the buffer that is created directly refers to the buffer in the actual buckets. // Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that // When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would // those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY // lead to corruption of results. The readers using this block are aware of this fact and do the
// so that the readers using this block are aware of this fact and do the necessary action // necessary action to prevent eviction till the results are either consumed or copied
// to prevent eviction till the results are either consumed or copied return be.wrapAsCacheable(buffers);
return be.wrapAsCacheable(buffers, MemoryType.SHARED);
} }
/** /**

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -39,6 +38,6 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength())); ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
bufferArray.read(be.offset(), dst); bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength()); dst.position(0).limit(be.getLength());
return be.wrapAsCacheable(dst.nioByteBuffers(), MemoryType.EXCLUSIVE); return be.wrapAsCacheable(dst.nioByteBuffers());
} }
} }

View File

@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -143,7 +142,7 @@ public class FileIOEngine implements IOEngine {
} }
} }
dstBuffer.rewind(); dstBuffer.rewind();
return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer }, MemoryType.EXCLUSIVE); return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer });
} }
@VisibleForTesting @VisibleForTesting

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -54,9 +53,8 @@ public class SharedMemoryMmapIOEngine extends FileMmapIOEngine {
// Here the buffer that is created directly refers to the buffer in the actual buckets. // Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that // When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would // those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY // lead to corruption of results. The readers using this block are aware of this fact and do
// so that the readers using this block are aware of this fact and do the necessary action // the necessary action to prevent eviction till the results are either consumed or copied
// to prevent eviction till the results are either consumed or copied return be.wrapAsCacheable(buffers);
return be.wrapAsCacheable(buffers, MemoryType.SHARED);
} }
} }

View File

@ -232,8 +232,7 @@ public class CacheTestUtils {
} }
@Override @Override
public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc, MemoryType memType) public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
throws IOException {
int len = b.getInt(); int len = b.getInt();
Thread.yield(); Thread.yield();
byte buf[] = new byte[len]; byte buf[] = new byte[len];
@ -281,11 +280,6 @@ public class CacheTestUtils {
public BlockType getBlockType() { public BlockType getBlockType() {
return BlockType.DATA; return BlockType.DATA;
} }
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
} }

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
@ -82,7 +81,7 @@ public class TestCacheConfig {
} }
@Override @Override
public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc, MemoryType memType) public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc)
throws IOException { throws IOException {
LOG.info("Deserialized " + b); LOG.info("Deserialized " + b);
return cacheable; return cacheable;
@ -144,11 +143,6 @@ public class TestCacheConfig {
public BlockType getBlockType() { public BlockType getBlockType() {
return BlockType.DATA; return BlockType.DATA;
} }
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
} }
static class MetaCacheEntry extends DataCacheEntry { static class MetaCacheEntry extends DataCacheEntry {

View File

@ -145,11 +145,6 @@ public class TestCachedBlockQueue extends TestCase {
return BlockType.DATA; return BlockType.DATA;
} }
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
}, accessTime, false); }, accessTime, false);
} }
} }

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
@ -526,7 +525,7 @@ public class TestHFileBlock {
ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
blockFromHFile.serialize(serialized, true); blockFromHFile.serialize(serialized, true);
HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer() HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer()
.deserialize(new SingleByteBuff(serialized), HEAP, MemoryType.EXCLUSIVE); .deserialize(new SingleByteBuff(serialized), HEAP);
assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
blockFromHFile, deserialized); blockFromHFile, deserialized);
// intentional reference comparison // intentional reference comparison

View File

@ -0,0 +1,301 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ IOTests.class, SmallTests.class })
public class TestHFileScannerImplReferenceCount {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final byte[] SUFFIX = randLongBytes();
private static byte[] randLongBytes() {
Random rand = new Random();
byte[] keys = new byte[300];
rand.nextBytes(keys);
return keys;
}
private Cell firstCell = null;
private Cell secondCell = null;
@BeforeClass
public static void setUp() {
Configuration conf = UTIL.getConfiguration();
// Set the max chunk size and min entries key to be very small for index block, so that we can
// create an index block tree with level >= 2.
conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
}
private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
DataBlockEncoding encoding, int cellCount) throws IOException {
HFileContext context =
new HFileContextBuilder().withBlockSize(1).withDataBlockEncoding(DataBlockEncoding.NONE)
.withCompression(compression).withDataBlockEncoding(encoding).build();
try (HFile.Writer writer =
new HFile.WriterFactory(conf, new CacheConfig(conf)).withPath(fs, hfilePath)
.withFileContext(context).withComparator(CellComparatorImpl.COMPARATOR).create()) {
Random rand = new Random(9713312); // Just a fixed seed.
for (int i = 0; i < cellCount; ++i) {
byte[] keyBytes = Bytes.add(Bytes.toBytes(i), SUFFIX);
// A random-length random value.
byte[] valueBytes = RandomKeyValueUtil.randomValue(rand);
KeyValue keyValue =
new KeyValue(keyBytes, FAMILY, QUALIFIER, HConstants.LATEST_TIMESTAMP, valueBytes);
if (firstCell == null) {
firstCell = keyValue;
} else if (secondCell == null) {
secondCell = keyValue;
}
writer.append(keyValue);
}
}
}
private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
throws Exception {
Configuration conf = new Configuration(UTIL.getConfiguration());
Path dir = UTIL.getDataTestDir("testReleasingBlock");
FileSystem fs = dir.getFileSystem(conf);
try {
String hfileName = "testReleaseBlock_hfile_0_" + System.currentTimeMillis();
Path hfilePath = new Path(dir, hfileName);
int cellCount = 1000;
LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
writeHFile(conf, fs, hfilePath, compression, encoding, cellCount);
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
Assert.assertNotNull(defaultBC);
HFile.Reader reader =
HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScanner scanner = reader.getScanner(true, true, false);
BlockWithScanInfo scanInfo = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE);
BlockWithScanInfo scanInfo2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE);
HFileBlock block = scanInfo.getHFileBlock();
HFileBlock block2 = scanInfo2.getHFileBlock();
// One refCnt for blockCache and the other refCnt for RPC path.
Assert.assertEquals(block.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 2);
Assert.assertFalse(block == block2);
scanner.seekTo(firstCell);
Assert.assertEquals(block.refCnt(), 3);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged.
scanner.seekTo(firstCell);
Assert.assertEquals(block.refCnt(), 3);
scanner.seekTo(secondCell);
Assert.assertEquals(block.refCnt(), 3);
Assert.assertEquals(block2.refCnt(), 3);
// After shipped, the block will be release, but block2 is still referenced by the curBlock.
scanner.shipped();
Assert.assertEquals(block.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 3);
// Try to ship again, though with nothing to client.
scanner.shipped();
Assert.assertEquals(block.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 3);
// The curBlock(block2) will also be released.
scanner.close();
Assert.assertEquals(block2.refCnt(), 2);
// Finish the block & block2 RPC path
block.release();
block2.release();
Assert.assertEquals(block.refCnt(), 1);
Assert.assertEquals(block2.refCnt(), 1);
// Evict the LRUBlockCache
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
Assert.assertEquals(block.refCnt(), 0);
Assert.assertEquals(block2.refCnt(), 0);
int count = 0;
Assert.assertTrue(scanner.seekTo());
++count;
while (scanner.next()) {
count++;
}
assertEquals(cellCount, count);
} finally {
fs.delete(dir, true);
}
}
/**
* See HBASE-22480
*/
@Test
public void testSeekBefore() throws IOException {
Configuration conf = new Configuration(UTIL.getConfiguration());
Path dir = UTIL.getDataTestDir("testSeekBefore");
FileSystem fs = dir.getFileSystem(conf);
try {
String hfileName = "testSeekBefore_hfile_0_" + System.currentTimeMillis();
Path hfilePath = new Path(dir, hfileName);
int cellCount = 1000;
LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, cellCount);
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
Assert.assertNotNull(defaultBC);
HFile.Reader reader =
HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScanner scanner = reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertEquals(block1.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 2);
// Let the curBlock refer to block2.
scanner.seekTo(secondCell);
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block2);
Assert.assertEquals(3, block2.refCnt());
// Release the block1, only one reference: blockCache.
Assert.assertFalse(block1.release());
Assert.assertEquals(1, block1.refCnt());
// Release the block2, so the remain references are: 1. scanner; 2. blockCache.
Assert.assertFalse(block2.release());
Assert.assertEquals(2, block2.refCnt());
// Do the seekBefore: the newBlock will be the previous block of curBlock.
Assert.assertTrue(scanner.seekBefore(secondCell));
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
// Two reference for block1: 1. scanner; 2. blockCache.
Assert.assertEquals(2, block1.refCnt());
// Reference count of block2 must be unchanged because we haven't shipped.
Assert.assertEquals(2, block2.refCnt());
// Do the shipped
scanner.shipped();
Assert.assertEquals(2, block1.refCnt());
Assert.assertEquals(1, block2.refCnt());
// Do the close
scanner.close();
Assert.assertEquals(1, block1.refCnt());
Assert.assertEquals(1, block2.refCnt());
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
Assert.assertEquals(0, block1.refCnt());
Assert.assertEquals(0, block2.refCnt());
// Reload the block1 again.
block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertFalse(block1.release());
Assert.assertEquals(1, block1.refCnt());
// Re-seek to the begin.
Assert.assertTrue(scanner.seekTo());
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
Assert.assertEquals(2, block1.refCnt());
// Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close.
Assert.assertEquals(2, block1.refCnt());
scanner.close();
Assert.assertEquals(1, block1.refCnt());
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 1);
Assert.assertEquals(0, block1.refCnt());
} finally {
fs.delete(dir, true);
}
}
@Test
public void testDefault() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
}
@Test
public void testCompression() throws Exception {
testReleaseBlock(Algorithm.GZ, DataBlockEncoding.NONE);
}
@Test
public void testDataBlockEncoding() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.ROW_INDEX_V1);
}
@Test
public void testDataBlockEncodingAndCompression() throws Exception {
testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
}
}

View File

@ -955,12 +955,6 @@ public class TestLruBlockCache {
public BlockType getBlockType() { public BlockType getBlockType() {
return BlockType.DATA; return BlockType.DATA;
} }
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
} }
static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception { static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {

View File

@ -295,11 +295,6 @@ public class TestTinyLfuBlockCache {
return BlockType.DATA; return BlockType.DATA;
} }
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
@Override @Override
public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
} }

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@ -148,7 +147,6 @@ public class TestBucketCacheRefCnt {
assertEquals(1, blk.refCnt()); assertEquals(1, blk.refCnt());
Cacheable block = cache.getBlock(key, false, false, false); Cacheable block = cache.getBlock(key, false, false, false);
assertTrue(block.getMemoryType() == MemoryType.SHARED);
assertTrue(block instanceof HFileBlock); assertTrue(block instanceof HFileBlock);
assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc); assertTrue(((HFileBlock) block).getByteBuffAllocator() == alloc);
assertEquals(2, block.refCnt()); assertEquals(2, block.refCnt());
@ -157,7 +155,6 @@ public class TestBucketCacheRefCnt {
assertEquals(3, block.refCnt()); assertEquals(3, block.refCnt());
Cacheable newBlock = cache.getBlock(key, false, false, false); Cacheable newBlock = cache.getBlock(key, false, false, false);
assertTrue(newBlock.getMemoryType() == MemoryType.SHARED);
assertTrue(newBlock instanceof HFileBlock); assertTrue(newBlock instanceof HFileBlock);
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
assertEquals(4, newBlock.refCnt()); assertEquals(4, newBlock.refCnt());

View File

@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
@ -127,7 +126,7 @@ public class TestByteBufferIOEngine {
private int identifier; private int identifier;
@Override @Override
public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc, MemoryType memType) public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc)
throws IOException { throws IOException {
this.buf = b; this.buf = b;
return null; return null;