HBASE-22005 Use ByteBuff's refcnt to track the life cycle of data block
This commit is contained in:
parent
f97f6e3e6a
commit
0d294d23a9
|
@ -62,6 +62,11 @@ public class ByteBuffAllocator {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
|
||||
|
||||
// The on-heap allocator is mostly used for testing, but also some non-test usage, such as
|
||||
// scanning snapshot, we won't have an RpcServer to initialize the allocator, so just use the
|
||||
// default heap allocator, it will just allocate ByteBuffers from heap but wrapped by an ByteBuff.
|
||||
public static final ByteBuffAllocator HEAP = ByteBuffAllocator.createOnHeap();
|
||||
|
||||
public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count";
|
||||
|
||||
public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size";
|
||||
|
@ -131,7 +136,7 @@ public class ByteBuffAllocator {
|
|||
* designed for testing purpose or disabled reservoir case.
|
||||
* @return allocator to allocate on-heap ByteBuffer.
|
||||
*/
|
||||
public static ByteBuffAllocator createOnHeap() {
|
||||
private static ByteBuffAllocator createOnHeap() {
|
||||
return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
|
@ -167,7 +172,11 @@ public class ByteBuffAllocator {
|
|||
}
|
||||
}
|
||||
// Allocated from heap, let the JVM free its memory.
|
||||
return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize));
|
||||
return allocateOnHeap(this.bufSize);
|
||||
}
|
||||
|
||||
private SingleByteBuff allocateOnHeap(int size) {
|
||||
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -128,7 +128,7 @@ public class TestByteBuffAllocator {
|
|||
@Test
|
||||
public void testAllocateOneBuffer() {
|
||||
// Allocate from on-heap
|
||||
ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap();
|
||||
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
|
||||
ByteBuff buf = allocator.allocateOneBuffer();
|
||||
assertTrue(buf.hasArray());
|
||||
assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining());
|
||||
|
|
|
@ -135,14 +135,25 @@ public interface BlockCache extends Iterable<CachedBlock> {
|
|||
BlockCache [] getBlockCaches();
|
||||
|
||||
/**
|
||||
* Called when the scanner using the block decides to return the block once its usage
|
||||
* is over.
|
||||
* This API should be called after the block is used, failing to do so may have adverse effects
|
||||
* by preventing the blocks from being evicted because of which it will prevent new hot blocks
|
||||
* from getting added to the block cache. The implementation of the BlockCache will decide
|
||||
* on what to be done with the block based on the memory type of the block's {@link MemoryType}.
|
||||
* Called when the scanner using the block decides to decrease refCnt of block and return the
|
||||
* block once its usage is over. This API should be called after the block is used, failing to do
|
||||
* so may have adverse effects by preventing the blocks from being evicted because of which it
|
||||
* will prevent new hot blocks from getting added to the block cache. The implementation of the
|
||||
* BlockCache will decide on what to be done with the block based on the memory type of the
|
||||
* block's {@link MemoryType}. <br>
|
||||
* <br>
|
||||
* Note that if two handlers read from backingMap in off-heap BucketCache at the same time, BC
|
||||
* will return two ByteBuff, which reference to the same memory area in buckets, but wrapped by
|
||||
* two different ByteBuff, and each of them has its own independent refCnt(=1). so here, if
|
||||
* returnBlock with different blocks in two handlers, it has no problem. but if both the two
|
||||
* handlers returnBlock with the same block, then the refCnt exception will happen here. <br>
|
||||
* TODO let's unify the ByteBuff's refCnt and BucketEntry's refCnt in HBASE-21957, after that
|
||||
* we'll just call the Cacheable#release instead of calling release in some path and calling
|
||||
* returnBlock in other paths in current version.
|
||||
* @param cacheKey the cache key of the block
|
||||
* @param block the hfileblock to be returned
|
||||
*/
|
||||
default void returnBlock(BlockCacheKey cacheKey, Cacheable block){};
|
||||
default void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
|
||||
block.release();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,10 +229,7 @@ public class BlockCacheUtil {
|
|||
public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
|
||||
BlockCacheKey cacheKey, Cacheable newBlock) {
|
||||
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
|
||||
if (null == existingBlock) {
|
||||
// Not exist now.
|
||||
return true;
|
||||
}
|
||||
existingBlock.retain();
|
||||
try {
|
||||
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
|
||||
if (comparison < 0) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Optional;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -128,6 +129,8 @@ public class CacheConfig {
|
|||
// Local reference to the block cache
|
||||
private final BlockCache blockCache;
|
||||
|
||||
private final ByteBuffAllocator byteBuffAllocator;
|
||||
|
||||
/**
|
||||
* Create a cache configuration using the specified configuration object and
|
||||
* defaults for family level settings. Only use if no column family context.
|
||||
|
@ -138,7 +141,7 @@ public class CacheConfig {
|
|||
}
|
||||
|
||||
public CacheConfig(Configuration conf, BlockCache blockCache) {
|
||||
this(conf, null, blockCache);
|
||||
this(conf, null, blockCache, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -147,7 +150,8 @@ public class CacheConfig {
|
|||
* @param conf hbase configuration
|
||||
* @param family column family configuration
|
||||
*/
|
||||
public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache blockCache) {
|
||||
public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache blockCache,
|
||||
ByteBuffAllocator byteBuffAllocator) {
|
||||
this.cacheDataOnRead = conf.getBoolean(CACHE_DATA_ON_READ_KEY, DEFAULT_CACHE_DATA_ON_READ) &&
|
||||
(family == null ? true : family.isBlockCacheEnabled());
|
||||
this.inMemory = family == null ? DEFAULT_IN_MEMORY : family.isInMemory();
|
||||
|
@ -171,6 +175,7 @@ public class CacheConfig {
|
|||
this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ||
|
||||
(family == null ? false : family.isPrefetchBlocksOnOpen());
|
||||
this.blockCache = blockCache;
|
||||
this.byteBuffAllocator = byteBuffAllocator;
|
||||
LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
|
||||
" with blockCache=" + blockCache);
|
||||
}
|
||||
|
@ -190,6 +195,7 @@ public class CacheConfig {
|
|||
this.prefetchOnOpen = cacheConf.prefetchOnOpen;
|
||||
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
|
||||
this.blockCache = cacheConf.blockCache;
|
||||
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
|
||||
}
|
||||
|
||||
private CacheConfig() {
|
||||
|
@ -203,6 +209,7 @@ public class CacheConfig {
|
|||
this.prefetchOnOpen = false;
|
||||
this.dropBehindCompaction = false;
|
||||
this.blockCache = null;
|
||||
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -360,6 +367,10 @@ public class CacheConfig {
|
|||
return Optional.ofNullable(this.blockCache);
|
||||
}
|
||||
|
||||
public ByteBuffAllocator getByteBuffAllocator() {
|
||||
return this.byteBuffAllocator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "cacheDataOnRead=" + shouldCacheDataOnRead() + ", cacheDataOnWrite="
|
||||
|
@ -368,4 +379,4 @@ public class CacheConfig {
|
|||
+ shouldEvictOnClose() + ", cacheDataCompressed=" + shouldCacheDataCompressed()
|
||||
+ ", prefetchOnOpen=" + shouldPrefetchOnOpen();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.nio.ByteBuffer;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
|
||||
|
||||
/**
|
||||
* Cacheable is an interface that allows for an object to be cached. If using an
|
||||
* on heap cache, just use heapsize. If using an off heap cache, Cacheable
|
||||
|
@ -34,7 +36,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
|
|||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface Cacheable extends HeapSize {
|
||||
public interface Cacheable extends HeapSize, ReferenceCounted {
|
||||
/**
|
||||
* Returns the length of the ByteBuffer required to serialized the object. If the
|
||||
* object cannot be serialized, it should return 0.
|
||||
|
@ -75,4 +77,45 @@ public interface Cacheable extends HeapSize {
|
|||
enum MemoryType {
|
||||
SHARED, EXCLUSIVE
|
||||
}
|
||||
|
||||
/******************************* ReferenceCounted Interfaces ***********************************/
|
||||
|
||||
/**
|
||||
* Increase its reference count, and only when no reference we can free the object's memory.
|
||||
*/
|
||||
default Cacheable retain() {
|
||||
return this;
|
||||
}
|
||||
|
||||
default Cacheable retain(int increment) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reference count of this Cacheable.
|
||||
*/
|
||||
default int refCnt() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrease its reference count, and if no reference then free the memory of this object, its
|
||||
* backend is usually a {@link org.apache.hadoop.hbase.nio.ByteBuff}, and we will put its NIO
|
||||
* ByteBuffers back to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
|
||||
*/
|
||||
default boolean release() {
|
||||
return false;
|
||||
}
|
||||
|
||||
default boolean release(int increment) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
default ReferenceCounted touch() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
default ReferenceCounted touch(Object hint) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ import java.io.DataOutput;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -31,6 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -201,6 +204,8 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
private int nextBlockOnDiskSize = UNSET;
|
||||
|
||||
private ByteBuffAllocator allocator;
|
||||
|
||||
/**
|
||||
* On a checksum failure, do these many succeeding read requests using hdfs checksums before
|
||||
* auto-reenabling hbase checksum verification.
|
||||
|
@ -274,7 +279,10 @@ public class HFileBlock implements Cacheable {
|
|||
boolean usesChecksum = buf.get() == (byte) 1;
|
||||
long offset = buf.getLong();
|
||||
int nextBlockOnDiskSize = buf.getInt();
|
||||
return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
|
||||
// TODO make the newly created HFileBlock use the off-heap allocator, Need change the
|
||||
// deserializer or change the deserialize interface.
|
||||
return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null,
|
||||
ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -309,7 +317,7 @@ public class HFileBlock implements Cacheable {
|
|||
private HFileBlock(HFileBlock that, boolean bufCopy) {
|
||||
init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
|
||||
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
|
||||
that.fileContext);
|
||||
that.fileContext, that.allocator);
|
||||
if (bufCopy) {
|
||||
this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
|
||||
} else {
|
||||
|
@ -342,9 +350,9 @@ public class HFileBlock implements Cacheable {
|
|||
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
|
||||
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
|
||||
HFileContext fileContext) {
|
||||
HFileContext fileContext, ByteBuffAllocator allocator) {
|
||||
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
|
||||
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
|
||||
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
|
||||
this.buf = new SingleByteBuff(b);
|
||||
if (fillHeader) {
|
||||
overwriteHeader();
|
||||
|
@ -360,7 +368,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @param buf Has header, content, and trailing checksums if present.
|
||||
*/
|
||||
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset,
|
||||
final int nextBlockOnDiskSize, HFileContext fileContext)
|
||||
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
|
||||
throws IOException {
|
||||
buf.rewind();
|
||||
final BlockType blockType = BlockType.read(buf);
|
||||
|
@ -390,7 +398,7 @@ public class HFileBlock implements Cacheable {
|
|||
fileContext = fileContextBuilder.build();
|
||||
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
|
||||
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
|
||||
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext);
|
||||
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
|
||||
this.memType = memType;
|
||||
this.offset = offset;
|
||||
this.buf = buf;
|
||||
|
@ -402,7 +410,8 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
|
||||
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext) {
|
||||
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) {
|
||||
this.blockType = blockType;
|
||||
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
|
||||
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
|
||||
|
@ -411,6 +420,7 @@ public class HFileBlock implements Cacheable {
|
|||
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
|
||||
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
|
||||
this.fileContext = fileContext;
|
||||
this.allocator = allocator;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -438,6 +448,26 @@ public class HFileBlock implements Cacheable {
|
|||
return blockType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int refCnt() {
|
||||
return buf.refCnt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock retain() {
|
||||
buf.retain();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call {@link ByteBuff#release()} to decrease the reference count, if no other reference, it will
|
||||
* return back the {@link ByteBuffer} to {@link org.apache.hadoop.hbase.io.ByteBuffAllocator}
|
||||
*/
|
||||
@Override
|
||||
public boolean release() {
|
||||
return buf.release();
|
||||
}
|
||||
|
||||
/** @return get data block encoding id that was used to encode this block */
|
||||
short getDataBlockEncodingId() {
|
||||
if (blockType != BlockType.ENCODED_DATA) {
|
||||
|
@ -661,7 +691,7 @@ public class HFileBlock implements Cacheable {
|
|||
int headerSize = headerSize();
|
||||
int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
|
||||
|
||||
ByteBuff newBuf = new SingleByteBuff(ByteBuffer.allocate(capacityNeeded));
|
||||
ByteBuff newBuf = allocator.allocate(capacityNeeded);
|
||||
|
||||
// Copy header bytes into newBuf.
|
||||
// newBuf is HBB so no issue in calling array()
|
||||
|
@ -681,7 +711,7 @@ public class HFileBlock implements Cacheable {
|
|||
final int cksumBytes = totalChecksumBytes();
|
||||
final int headerSize = headerSize();
|
||||
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
|
||||
final int bufCapacity = buf.capacity();
|
||||
final int bufCapacity = buf.remaining();
|
||||
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
|
||||
}
|
||||
|
||||
|
@ -1218,7 +1248,8 @@ public class HFileBlock implements Cacheable {
|
|||
cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
|
||||
: cloneUncompressedBufferWithHeader(),
|
||||
FILL_HEADER, startOffset, UNSET,
|
||||
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext);
|
||||
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
|
||||
cacheConf.getByteBuffAllocator());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1236,7 +1267,10 @@ public class HFileBlock implements Cacheable {
|
|||
void writeToBlock(DataOutput out) throws IOException;
|
||||
}
|
||||
|
||||
/** Iterator for {@link HFileBlock}s. */
|
||||
/**
|
||||
* Iterator for reading {@link HFileBlock}s in load-on-open-section, such as root data index
|
||||
* block, meta index block, file info block etc.
|
||||
*/
|
||||
interface BlockIterator {
|
||||
/**
|
||||
* Get the next block, or null if there are no more blocks to iterate.
|
||||
|
@ -1244,10 +1278,20 @@ public class HFileBlock implements Cacheable {
|
|||
HFileBlock nextBlock() throws IOException;
|
||||
|
||||
/**
|
||||
* Similar to {@link #nextBlock()} but checks block type, throws an
|
||||
* exception if incorrect, and returns the HFile block
|
||||
* Similar to {@link #nextBlock()} but checks block type, throws an exception if incorrect, and
|
||||
* returns the HFile block
|
||||
*/
|
||||
HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException;
|
||||
|
||||
/**
|
||||
* Now we use the {@link ByteBuffAllocator} to manage the nio ByteBuffers for HFileBlocks, so we
|
||||
* must deallocate all of the ByteBuffers in the end life. the BlockIterator's life cycle is
|
||||
* starting from opening an HFileReader and stopped when the HFileReader#close, so we will keep
|
||||
* track all the read blocks until we call {@link BlockIterator#freeBlocks()} when closing the
|
||||
* HFileReader. Sum bytes of those blocks in load-on-open section should be quite small, so
|
||||
* tracking them should be OK.
|
||||
*/
|
||||
void freeBlocks();
|
||||
}
|
||||
|
||||
/** An HFile block reader with iteration ability. */
|
||||
|
@ -1350,10 +1394,12 @@ public class HFileBlock implements Cacheable {
|
|||
// Cache the fileName
|
||||
private String pathName;
|
||||
|
||||
private final ByteBuffAllocator allocator;
|
||||
|
||||
private final Lock streamLock = new ReentrantLock();
|
||||
|
||||
FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
|
||||
HFileContext fileContext) throws IOException {
|
||||
HFileContext fileContext, ByteBuffAllocator allocator) throws IOException {
|
||||
this.fileSize = fileSize;
|
||||
this.hfs = hfs;
|
||||
if (path != null) {
|
||||
|
@ -1361,6 +1407,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
this.fileContext = fileContext;
|
||||
this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
|
||||
this.allocator = allocator;
|
||||
|
||||
this.streamWrapper = stream;
|
||||
// Older versions of HBase didn't support checksum.
|
||||
|
@ -1373,15 +1420,18 @@ public class HFileBlock implements Cacheable {
|
|||
* A constructor that reads files with the latest minor version. This is used by unit tests
|
||||
* only.
|
||||
*/
|
||||
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
|
||||
throws IOException {
|
||||
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
|
||||
FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) throws IOException {
|
||||
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext, allocator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockIterator blockRange(final long startOffset, final long endOffset) {
|
||||
final FSReader owner = this; // handle for inner class
|
||||
return new BlockIterator() {
|
||||
private volatile boolean freed = false;
|
||||
// Tracking all read blocks until we call freeBlocks.
|
||||
private List<HFileBlock> blockTracker = new ArrayList<>();
|
||||
private long offset = startOffset;
|
||||
// Cache length of next block. Current block has the length of next block in it.
|
||||
private long length = -1;
|
||||
|
@ -1394,19 +1444,33 @@ public class HFileBlock implements Cacheable {
|
|||
HFileBlock b = readBlockData(offset, length, false, false);
|
||||
offset += b.getOnDiskSizeWithHeader();
|
||||
length = b.getNextBlockOnDiskSize();
|
||||
return b.unpack(fileContext, owner);
|
||||
HFileBlock uncompressed = b.unpack(fileContext, owner);
|
||||
if (uncompressed != b) {
|
||||
b.release(); // Need to release the compressed Block now.
|
||||
}
|
||||
blockTracker.add(uncompressed);
|
||||
return uncompressed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock nextBlockWithBlockType(BlockType blockType)
|
||||
throws IOException {
|
||||
public HFileBlock nextBlockWithBlockType(BlockType blockType) throws IOException {
|
||||
HFileBlock blk = nextBlock();
|
||||
if (blk.getBlockType() != blockType) {
|
||||
throw new IOException("Expected block of type " + blockType
|
||||
+ " but found " + blk.getBlockType());
|
||||
throw new IOException(
|
||||
"Expected block of type " + blockType + " but found " + blk.getBlockType());
|
||||
}
|
||||
return blk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeBlocks() {
|
||||
if (freed) {
|
||||
return;
|
||||
}
|
||||
blockTracker.forEach(HFileBlock::release);
|
||||
blockTracker = null;
|
||||
freed = true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -1661,8 +1725,7 @@ public class HFileBlock implements Cacheable {
|
|||
// says where to start reading. If we have the header cached, then we don't need to read
|
||||
// it again and we can likely read from last place we left off w/o need to backup and reread
|
||||
// the header we read last time through here.
|
||||
ByteBuff onDiskBlock =
|
||||
new SingleByteBuff(ByteBuffer.allocate(onDiskSizeWithHeader + hdrSize));
|
||||
ByteBuff onDiskBlock = allocator.allocate(onDiskSizeWithHeader + hdrSize);
|
||||
boolean initHFileBlockSuccess = false;
|
||||
try {
|
||||
if (headerBuf != null) {
|
||||
|
@ -1679,7 +1742,7 @@ public class HFileBlock implements Cacheable {
|
|||
// Do a few checks before we go instantiate HFileBlock.
|
||||
assert onDiskSizeWithHeader > this.hdrSize;
|
||||
verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
|
||||
ByteBuff curBlock = onDiskBlock.duplicate().limit(onDiskSizeWithHeader);
|
||||
ByteBuff curBlock = onDiskBlock.duplicate().position(0).limit(onDiskSizeWithHeader);
|
||||
// Verify checksum of the data before using it for building HFileBlock.
|
||||
if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
|
||||
return null;
|
||||
|
@ -1692,7 +1755,7 @@ public class HFileBlock implements Cacheable {
|
|||
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
|
||||
// contains the header of next block, so no need to set next block's header in it.
|
||||
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
|
||||
offset, nextBlockOnDiskSize, fileContext);
|
||||
offset, nextBlockOnDiskSize, fileContext, allocator);
|
||||
// Run check on uncompressed sizings.
|
||||
if (!fileContext.isCompressedOrEncrypted()) {
|
||||
hFileBlock.sanityCheckUncompressed();
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.security.Key;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
|
@ -137,6 +138,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
*/
|
||||
private IdLock offsetLock = new IdLock();
|
||||
|
||||
/**
|
||||
* The iterator will track all blocks in load-on-open section, since we use the
|
||||
* {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} to manage the ByteBuffers in block now, so
|
||||
* we must ensure that deallocate all ByteBuffers in the end.
|
||||
*/
|
||||
private final HFileBlock.BlockIterator blockIter;
|
||||
|
||||
/**
|
||||
* Blocks read from the load-on-open section, excluding data root index, meta
|
||||
* index, and file info.
|
||||
|
@ -199,7 +207,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
this.primaryReplicaReader = primaryReplicaReader;
|
||||
checkFileVersion();
|
||||
this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
|
||||
this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
|
||||
this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext,
|
||||
cacheConf.getByteBuffAllocator());
|
||||
|
||||
// Comparator class name is stored in the trailer in version 2.
|
||||
comparator = trailer.createComparator();
|
||||
|
@ -207,11 +216,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
trailer.getNumDataIndexLevels(), this);
|
||||
metaBlockIndexReader = new HFileBlockIndex.ByteArrayKeyBlockIndexReader(1);
|
||||
|
||||
// Parse load-on-open data.
|
||||
|
||||
HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
|
||||
trailer.getLoadOnOpenDataOffset(),
|
||||
fileSize - trailer.getTrailerSize());
|
||||
// Initialize an block iterator, and parse load-on-open blocks in the following.
|
||||
blockIter = fsBlockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
|
||||
fileSize - trailer.getTrailerSize());
|
||||
|
||||
// Data index. We also read statistics about the block index written after
|
||||
// the root level.
|
||||
|
@ -372,12 +379,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
@Override
|
||||
public void returnBlock(HFileBlock block) {
|
||||
if (block != null) {
|
||||
this.cacheConf.getBlockCache().ifPresent(blockCache -> {
|
||||
BlockCacheKey cacheKey =
|
||||
new BlockCacheKey(this.getFileContext().getHFileName(), block.getOffset(),
|
||||
this.isPrimaryReplicaReader(), block.getBlockType());
|
||||
blockCache.returnBlock(cacheKey, block);
|
||||
});
|
||||
if (this.cacheConf.getBlockCache().isPresent()) {
|
||||
BlockCacheKey cacheKey = new BlockCacheKey(this.getFileContext().getHFileName(),
|
||||
block.getOffset(), this.isPrimaryReplicaReader(), block.getBlockType());
|
||||
cacheConf.getBlockCache().get().returnBlock(cacheKey, block);
|
||||
} else {
|
||||
// Release the block here, it means the RPC path didn't ref to this block any more.
|
||||
block.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -543,7 +552,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
this.curBlock = null;
|
||||
}
|
||||
|
||||
private void returnBlockToCache(HFileBlock block) {
|
||||
private void returnBlock(HFileBlock block) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Returning the block : " + block);
|
||||
}
|
||||
|
@ -552,11 +561,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
private void returnBlocks(boolean returnAll) {
|
||||
for (int i = 0; i < this.prevBlocks.size(); i++) {
|
||||
returnBlockToCache(this.prevBlocks.get(i));
|
||||
returnBlock(this.prevBlocks.get(i));
|
||||
}
|
||||
this.prevBlocks.clear();
|
||||
if (returnAll && this.curBlock != null) {
|
||||
returnBlockToCache(this.curBlock);
|
||||
returnBlock(this.curBlock);
|
||||
this.curBlock = null;
|
||||
}
|
||||
}
|
||||
|
@ -1136,10 +1145,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
return true;
|
||||
}
|
||||
|
||||
protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException,
|
||||
CorruptHFileException {
|
||||
protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException {
|
||||
HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
|
||||
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
|
||||
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
|
||||
if (newBlock.getOffset() < 0) {
|
||||
throw new IOException(
|
||||
"Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
|
||||
|
@ -1393,12 +1401,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
// Per meta key from any given file, synchronize reads for said block. This
|
||||
// is OK to do for meta blocks because the meta block index is always
|
||||
// single-level.
|
||||
synchronized (metaBlockIndexReader
|
||||
.getRootBlockKey(block)) {
|
||||
synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
|
||||
// Check cache for block. If found return.
|
||||
long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
|
||||
BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
|
||||
this.isPrimaryReplicaReader(), BlockType.META);
|
||||
BlockCacheKey cacheKey =
|
||||
new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);
|
||||
|
||||
cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
|
||||
HFileBlock cachedBlock =
|
||||
|
@ -1411,15 +1418,19 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
}
|
||||
// Cache Miss, please load.
|
||||
|
||||
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false).
|
||||
unpack(hfileContext, fsBlockReader);
|
||||
HFileBlock compressedBlock =
|
||||
fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false);
|
||||
HFileBlock uncompressedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
|
||||
if (compressedBlock != uncompressedBlock) {
|
||||
compressedBlock.release();
|
||||
}
|
||||
|
||||
// Cache the block
|
||||
if (cacheBlock) {
|
||||
cacheConf.getBlockCache()
|
||||
.ifPresent(cache -> cache.cacheBlock(cacheKey, metaBlock, cacheConf.isInMemory()));
|
||||
cacheConf.getBlockCache().ifPresent(
|
||||
cache -> cache.cacheBlock(cacheKey, uncompressedBlock, cacheConf.isInMemory()));
|
||||
}
|
||||
return metaBlock;
|
||||
return uncompressedBlock;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1501,14 +1512,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
|
||||
|
||||
// Cache the block if necessary
|
||||
AtomicBoolean cachedRaw = new AtomicBoolean(false);
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
|
||||
cache.cacheBlock(cacheKey,
|
||||
cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
|
||||
cacheConf.isInMemory());
|
||||
cachedRaw.set(cacheConf.shouldCacheCompressed(category));
|
||||
cache.cacheBlock(cacheKey, cachedRaw.get() ? hfileBlock : unpacked,
|
||||
cacheConf.isInMemory());
|
||||
}
|
||||
});
|
||||
|
||||
if (unpacked != hfileBlock && !cachedRaw.get()) {
|
||||
// End of life here if hfileBlock is an independent block.
|
||||
hfileBlock.release();
|
||||
}
|
||||
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
|
||||
HFile.DATABLOCK_READ_COUNT.increment();
|
||||
}
|
||||
|
@ -1581,6 +1596,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
@Override
|
||||
public void close(boolean evictOnClose) throws IOException {
|
||||
PrefetchExecutor.cancel(path);
|
||||
// Deallocate blocks in load-on-open section
|
||||
blockIter.freeBlocks();
|
||||
// Deallocate data blocks
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
if (evictOnClose) {
|
||||
int numEvicted = cache.evictBlocksByHfileName(name);
|
||||
|
|
|
@ -402,6 +402,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
}
|
||||
return;
|
||||
}
|
||||
// The block will be referenced by the LRUBlockCache, so should increase the refCnt here.
|
||||
buf.retain();
|
||||
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
|
||||
long newSize = updateSizeMetrics(cb, false);
|
||||
map.put(cacheKey, cb);
|
||||
|
@ -440,9 +442,12 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
/**
|
||||
* Cache the block with the specified name and buffer.
|
||||
* <p>
|
||||
*
|
||||
* TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
|
||||
* sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
|
||||
* switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
|
||||
* otherwise the caching size is based on off-heap.
|
||||
* @param cacheKey block's cache key
|
||||
* @param buf block buffer
|
||||
* @param buf block buffer
|
||||
*/
|
||||
@Override
|
||||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
|
||||
|
@ -490,14 +495,20 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
// However if this is a retry ( second time in double checked locking )
|
||||
// And it's already a miss then the l2 will also be a miss.
|
||||
if (victimHandler != null && !repeat) {
|
||||
// The handler will increase result's refCnt for RPC, so need no extra retain.
|
||||
Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
|
||||
// Promote this to L1.
|
||||
if (result != null && caching) {
|
||||
if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
|
||||
result = ((HFileBlock) result).deepClone();
|
||||
if (result != null) {
|
||||
if (caching) {
|
||||
if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
|
||||
Cacheable original = result;
|
||||
result = ((HFileBlock) original).deepClone();
|
||||
// deepClone an new one, so need to put the original one back to free it.
|
||||
victimHandler.returnBlock(cacheKey, original);
|
||||
}
|
||||
cacheBlock(cacheKey, result, /* inMemory = */ false);
|
||||
}
|
||||
cacheBlock(cacheKey, result, /* inMemory = */ false);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -505,6 +516,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
}
|
||||
if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
|
||||
cb.access(count.incrementAndGet());
|
||||
// It will be referenced by RPC path, so increase here.
|
||||
cb.getBuffer().retain();
|
||||
return cb.getBuffer();
|
||||
}
|
||||
|
||||
|
@ -558,10 +571,12 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
* @return the heap size of evicted block
|
||||
*/
|
||||
protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
|
||||
boolean found = map.remove(block.getCacheKey()) != null;
|
||||
if (!found) {
|
||||
LruCachedBlock previous = map.remove(block.getCacheKey());
|
||||
if (previous == null) {
|
||||
return 0;
|
||||
}
|
||||
// Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate.
|
||||
previous.getBuffer().release();
|
||||
updateSizeMetrics(block, true);
|
||||
long val = elements.decrementAndGet();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -1142,17 +1157,6 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
return fileNames;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<BlockType, Integer> getBlockTypeCountsForTest() {
|
||||
Map<BlockType, Integer> counts = new EnumMap<>(BlockType.class);
|
||||
for (LruCachedBlock cb : map.values()) {
|
||||
BlockType blockType = cb.getBuffer().getBlockType();
|
||||
Integer count = counts.get(blockType);
|
||||
counts.put(blockType, (count == null ? 0 : count) + 1);
|
||||
}
|
||||
return counts;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
|
||||
Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
|
||||
|
|
|
@ -135,7 +135,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
|
||||
// Store the block in this map before writing it to cache
|
||||
@VisibleForTesting
|
||||
transient final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
|
||||
transient final RAMCache ramCache;
|
||||
// In this map, store the block's meta data like offset, length
|
||||
@VisibleForTesting
|
||||
transient ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
|
||||
|
@ -291,7 +291,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
assert writerQueues.size() == writerThreads.length;
|
||||
this.ramCache = new ConcurrentHashMap<>();
|
||||
this.ramCache = new RAMCache();
|
||||
|
||||
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
|
||||
|
||||
|
@ -965,8 +965,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
continue;
|
||||
}
|
||||
BucketEntry bucketEntry =
|
||||
re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
bucketEntries[index] = bucketEntry;
|
||||
if (ioErrorStartTime > 0) {
|
||||
ioErrorStartTime = -1;
|
||||
|
@ -1520,6 +1520,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
ioEngine.write(sliceBuf, offset);
|
||||
ioEngine.write(metadata, offset + len - metadata.limit());
|
||||
} else {
|
||||
// Only used for testing.
|
||||
ByteBuffer bb = ByteBuffer.allocate(len);
|
||||
data.serialize(bb, true);
|
||||
ioEngine.write(bb, offset);
|
||||
|
@ -1645,6 +1646,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
|
||||
@Override
|
||||
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
|
||||
block.release();
|
||||
if (block.getMemoryType() == MemoryType.SHARED) {
|
||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||
if (bucketEntry != null) {
|
||||
|
@ -1688,4 +1690,53 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
float getMemoryFactor() {
|
||||
return memoryFactor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
|
||||
*/
|
||||
static class RAMCache {
|
||||
final ConcurrentMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
|
||||
|
||||
public boolean containsKey(BlockCacheKey key) {
|
||||
return delegate.containsKey(key);
|
||||
}
|
||||
|
||||
public RAMQueueEntry get(BlockCacheKey key) {
|
||||
RAMQueueEntry re = delegate.get(key);
|
||||
if (re != null) {
|
||||
// It'll be referenced by RPC, so retain here.
|
||||
re.getData().retain();
|
||||
}
|
||||
return re;
|
||||
}
|
||||
|
||||
public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
|
||||
RAMQueueEntry previous = delegate.putIfAbsent(key, entry);
|
||||
if (previous == null) {
|
||||
// The RAMCache reference to this entry, so reference count should be increment.
|
||||
entry.getData().retain();
|
||||
}
|
||||
return previous;
|
||||
}
|
||||
|
||||
public RAMQueueEntry remove(BlockCacheKey key) {
|
||||
RAMQueueEntry previous = delegate.remove(key);
|
||||
if (previous != null) {
|
||||
previous.getData().release();
|
||||
}
|
||||
return previous;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return delegate.isEmpty();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
it.next().getValue().getData().release();
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -749,6 +749,11 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
return scheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffAllocator getByteBuffAllocator() {
|
||||
return this.bbAllocator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRsRpcServices(RSRpcServices rsRpcServices) {
|
||||
this.rsRpcServices = rsRpcServices;
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.ipc;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
|
@ -88,5 +89,11 @@ public interface RpcServerInterface {
|
|||
|
||||
RpcScheduler getScheduler();
|
||||
|
||||
/**
|
||||
* Allocator to allocate/free the ByteBuffers, those ByteBuffers can be on-heap or off-heap.
|
||||
* @return byte buffer allocator
|
||||
*/
|
||||
ByteBuffAllocator getByteBuffAllocator();
|
||||
|
||||
void setRsRpcServices(RSRpcServices rsRpcServices);
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellComparator;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
|
@ -391,8 +392,12 @@ public class HMobStore extends HStore {
|
|||
Path path = new Path(location, fileName);
|
||||
try {
|
||||
file = mobFileCache.openFile(fs, path, cacheConf);
|
||||
return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search,
|
||||
cacheMobBlocks);
|
||||
Cell cell = readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
|
||||
: file.readCell(search, cacheMobBlocks);
|
||||
// Now we will return blocks to allocator for mob cells before shipping to rpc client.
|
||||
// it will be memory leak. so just copy cell as an on-heap KV here. will remove this in
|
||||
// HBASE-22122 (TODO)
|
||||
return KeyValueUtil.copyToNewKeyValue(cell);
|
||||
} catch (IOException e) {
|
||||
mobFileCache.evictFile(fileName);
|
||||
throwable = e;
|
||||
|
|
|
@ -374,7 +374,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
* @param family The current column family.
|
||||
*/
|
||||
protected void createCacheConf(final ColumnFamilyDescriptor family) {
|
||||
this.cacheConf = new CacheConfig(conf, family, region.getBlockCache());
|
||||
this.cacheConf = new CacheConfig(conf, family, region.getBlockCache(),
|
||||
region.getRegionServicesForStores().getByteBuffAllocator());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -66,6 +67,23 @@ public class RegionServicesForStores {
|
|||
return region.getWAL();
|
||||
}
|
||||
|
||||
private static ByteBuffAllocator ALLOCATOR_FOR_TEST;
|
||||
|
||||
private static synchronized ByteBuffAllocator getAllocatorForTest() {
|
||||
if (ALLOCATOR_FOR_TEST == null) {
|
||||
ALLOCATOR_FOR_TEST = ByteBuffAllocator.HEAP;
|
||||
}
|
||||
return ALLOCATOR_FOR_TEST;
|
||||
}
|
||||
|
||||
public ByteBuffAllocator getByteBuffAllocator() {
|
||||
if (rsServices != null && rsServices.getRpcServer() != null) {
|
||||
return rsServices.getRpcServer().getByteBuffAllocator();
|
||||
} else {
|
||||
return getAllocatorForTest();
|
||||
}
|
||||
}
|
||||
|
||||
private static ThreadPoolExecutor INMEMORY_COMPACTION_POOL_FOR_TEST;
|
||||
|
||||
private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest() {
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|||
import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
@ -45,9 +46,8 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
/**
|
||||
* Uses the load tester
|
||||
*/
|
||||
@Category({IOTests.class, MediumTests.class})
|
||||
public class TestLoadAndSwitchEncodeOnDisk extends
|
||||
TestMiniClusterLoadSequential {
|
||||
@Category({ IOTests.class, MediumTests.class })
|
||||
public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
|
@ -74,6 +74,7 @@ public class TestLoadAndSwitchEncodeOnDisk extends
|
|||
|
||||
@Override
|
||||
@Test
|
||||
@Ignore("TODO Ignore this UT temporarily, will fix this in the critical HBASE-21937.")
|
||||
public void loadTest() throws Exception {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
|
@ -365,11 +366,10 @@ public class CacheTestUtils {
|
|||
.withBytesPerCheckSum(0)
|
||||
.withChecksumType(ChecksumType.NULL)
|
||||
.build();
|
||||
HFileBlock generated = new HFileBlock(BlockType.DATA,
|
||||
onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
|
||||
prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
|
||||
blockSize,
|
||||
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta);
|
||||
HFileBlock generated = new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader,
|
||||
uncompressedSizeWithoutHeader, prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
|
||||
blockSize, onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
|
||||
ByteBuffAllocator.HEAP);
|
||||
|
||||
String strKey;
|
||||
/* No conflicting keys */
|
||||
|
@ -400,8 +400,7 @@ public class CacheTestUtils {
|
|||
}
|
||||
|
||||
public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
|
||||
Cacheable blockToCache, ByteBuffer destBuffer,
|
||||
ByteBuffer expectedBuffer) {
|
||||
Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) {
|
||||
destBuffer.clear();
|
||||
cache.cacheBlock(key, blockToCache);
|
||||
Cacheable actualBlock = cache.getBlock(key, false, false, false);
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
|
@ -250,7 +251,7 @@ public class TestCacheConfig {
|
|||
HColumnDescriptor family = new HColumnDescriptor("testDisableCacheDataBlock");
|
||||
family.setBlockCacheEnabled(false);
|
||||
|
||||
cacheConfig = new CacheConfig(conf, family, null);
|
||||
cacheConfig = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
|
||||
assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.DATA));
|
||||
assertFalse(cacheConfig.shouldCacheCompressed(BlockCategory.DATA));
|
||||
assertFalse(cacheConfig.shouldCacheDataCompressed());
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -112,7 +113,7 @@ public class TestCacheOnWrite {
|
|||
private static final int NUM_VALID_KEY_TYPES =
|
||||
KeyValue.Type.values().length - 2;
|
||||
|
||||
private static enum CacheOnWriteType {
|
||||
private enum CacheOnWriteType {
|
||||
DATA_BLOCKS(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
|
||||
BlockType.DATA, BlockType.ENCODED_DATA),
|
||||
BLOOM_BLOCKS(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
|
||||
|
@ -124,12 +125,11 @@ public class TestCacheOnWrite {
|
|||
private final BlockType blockType1;
|
||||
private final BlockType blockType2;
|
||||
|
||||
private CacheOnWriteType(String confKey, BlockType blockType) {
|
||||
CacheOnWriteType(String confKey, BlockType blockType) {
|
||||
this(confKey, blockType, blockType);
|
||||
}
|
||||
|
||||
private CacheOnWriteType(String confKey, BlockType blockType1,
|
||||
BlockType blockType2) {
|
||||
CacheOnWriteType(String confKey, BlockType blockType1, BlockType blockType2) {
|
||||
this.blockType1 = blockType1;
|
||||
this.blockType2 = blockType2;
|
||||
this.confKey = confKey;
|
||||
|
@ -269,18 +269,17 @@ public class TestCacheOnWrite {
|
|||
|
||||
DataBlockEncoding encodingInCache = NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding();
|
||||
List<Long> cachedBlocksOffset = new ArrayList<>();
|
||||
Map<Long, HFileBlock> cachedBlocks = new HashMap<>();
|
||||
Map<Long, Pair<HFileBlock, HFileBlock>> cachedBlocks = new HashMap<>();
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
// Flags: don't cache the block, use pread, this is not a compaction.
|
||||
// Also, pass null for expected block type to avoid checking it.
|
||||
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null,
|
||||
encodingInCache);
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
|
||||
offset);
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||
HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
|
||||
boolean isCached = fromCache != null;
|
||||
cachedBlocksOffset.add(offset);
|
||||
cachedBlocks.put(offset, fromCache);
|
||||
cachedBlocks.put(offset, fromCache == null ? null : Pair.newPair(block, fromCache));
|
||||
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
|
||||
assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
|
||||
"isCached: " + isCached + "\n" +
|
||||
|
@ -332,19 +331,20 @@ public class TestCacheOnWrite {
|
|||
Long entry = iterator.next();
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
|
||||
entry);
|
||||
HFileBlock hFileBlock = cachedBlocks.get(entry);
|
||||
if (hFileBlock != null) {
|
||||
// call return twice because for the isCache cased the counter would have got incremented
|
||||
// twice
|
||||
blockCache.returnBlock(blockCacheKey, hFileBlock);
|
||||
if(cacheCompressedData) {
|
||||
Pair<HFileBlock, HFileBlock> blockPair = cachedBlocks.get(entry);
|
||||
if (blockPair != null) {
|
||||
// Call return twice because for the isCache cased the counter would have got incremented
|
||||
// twice. Notice that here we need to returnBlock with different blocks. see comments in
|
||||
// BucketCache#returnBlock.
|
||||
blockCache.returnBlock(blockCacheKey, blockPair.getSecond());
|
||||
if (cacheCompressedData) {
|
||||
if (this.compress == Compression.Algorithm.NONE
|
||||
|| cowType == CacheOnWriteType.INDEX_BLOCKS
|
||||
|| cowType == CacheOnWriteType.BLOOM_BLOCKS) {
|
||||
blockCache.returnBlock(blockCacheKey, hFileBlock);
|
||||
blockCache.returnBlock(blockCacheKey, blockPair.getFirst());
|
||||
}
|
||||
} else {
|
||||
blockCache.returnBlock(blockCacheKey, hFileBlock);
|
||||
blockCache.returnBlock(blockCacheKey, blockPair.getFirst());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -457,7 +457,7 @@ public class TestCacheOnWrite {
|
|||
assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
|
||||
assertNotEquals(BlockType.DATA, block.getBlockType());
|
||||
}
|
||||
((HRegion)region).close();
|
||||
region.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
|
@ -97,8 +98,8 @@ public class TestChecksum {
|
|||
|
||||
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
|
||||
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
|
||||
is, totalSize, (HFileSystem) fs, path, meta);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false);
|
||||
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
|
||||
}
|
||||
|
@ -143,8 +144,8 @@ public class TestChecksum {
|
|||
|
||||
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
|
||||
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
|
||||
is, totalSize, (HFileSystem) fs, path, meta);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false);
|
||||
|
||||
// verify SingleByteBuff checksum.
|
||||
|
@ -339,8 +340,9 @@ public class TestChecksum {
|
|||
.withHBaseCheckSum(true)
|
||||
.withBytesPerCheckSum(bytesPerChecksum)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
|
||||
is, nochecksum), totalSize, hfs, path, meta);
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, nochecksum), totalSize,
|
||||
hfs, path, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
|
||||
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
|
||||
is.close();
|
||||
|
@ -382,7 +384,7 @@ public class TestChecksum {
|
|||
|
||||
public CorruptedFSReaderImpl(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
|
||||
Path path, HFileContext meta) throws IOException {
|
||||
super(istream, fileSize, (HFileSystem) fs, path, meta);
|
||||
super(istream, fileSize, (HFileSystem) fs, path, meta, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
@ -27,7 +29,9 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -42,6 +46,7 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
|
|||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -49,6 +54,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
|
|||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
|
@ -58,6 +64,7 @@ import org.apache.hadoop.hbase.testclassification.IOTests;
|
|||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
|
@ -99,24 +106,45 @@ public class TestHFile {
|
|||
|
||||
@Test
|
||||
public void testReaderWithoutBlockCache() throws Exception {
|
||||
Path path = writeStoreFile();
|
||||
try{
|
||||
readStoreFile(path);
|
||||
} catch (Exception e) {
|
||||
// fail test
|
||||
assertTrue(false);
|
||||
}
|
||||
int bufCount = 32;
|
||||
Configuration that = HBaseConfiguration.create(conf);
|
||||
that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
|
||||
// AllByteBuffers will be allocated from the buffers.
|
||||
that.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
|
||||
ByteBuffAllocator alloc = ByteBuffAllocator.create(that, true);
|
||||
List<ByteBuff> buffs = new ArrayList<>();
|
||||
// Fill the allocator with bufCount ByteBuffer
|
||||
for (int i = 0; i < bufCount; i++) {
|
||||
buffs.add(alloc.allocateOneBuffer());
|
||||
}
|
||||
Assert.assertEquals(alloc.getQueueSize(), 0);
|
||||
for (ByteBuff buf : buffs) {
|
||||
buf.release();
|
||||
}
|
||||
Assert.assertEquals(alloc.getQueueSize(), bufCount);
|
||||
// start write to store file.
|
||||
Path path = writeStoreFile();
|
||||
try {
|
||||
readStoreFile(path, that, alloc);
|
||||
} catch (Exception e) {
|
||||
// fail test
|
||||
assertTrue(false);
|
||||
}
|
||||
Assert.assertEquals(bufCount, alloc.getQueueSize());
|
||||
}
|
||||
|
||||
|
||||
private void readStoreFile(Path storeFilePath) throws Exception {
|
||||
private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc)
|
||||
throws Exception {
|
||||
// Open the file reader with block cache disabled.
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, conf);
|
||||
CacheConfig cache = new CacheConfig(conf, null, null, alloc);
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cache, true, conf);
|
||||
long offset = 0;
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
block.release(); // return back the ByteBuffer back to allocator.
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
|
||||
private Path writeStoreFile() throws IOException {
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -326,7 +327,8 @@ public class TestHFileBlock {
|
|||
.withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(includesTag)
|
||||
.withCompression(algo).build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
|
||||
is.close();
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
|
@ -339,7 +341,7 @@ public class TestHFileBlock {
|
|||
|
||||
if (algo == GZ) {
|
||||
is = fs.open(path);
|
||||
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
||||
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
|
||||
b.totalChecksumBytes(), pread, false);
|
||||
assertEquals(expected, b);
|
||||
|
@ -425,7 +427,8 @@ public class TestHFileBlock {
|
|||
.withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(includesTag)
|
||||
.build();
|
||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
||||
HFileBlock.FSReaderImpl hbr =
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
hbr.setDataBlockEncoder(dataBlockEncoder);
|
||||
hbr.setIncludesMemStoreTS(includesMemstoreTS);
|
||||
HFileBlock blockFromHFile, blockUnpacked;
|
||||
|
@ -553,7 +556,8 @@ public class TestHFileBlock {
|
|||
.withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(includesTag)
|
||||
.withCompression(algo).build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
long curOffset = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
if (!pread) {
|
||||
|
@ -737,7 +741,8 @@ public class TestHFileBlock {
|
|||
.withIncludesTags(includesTag)
|
||||
.withCompression(compressAlgo)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta);
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(is, fileSize, meta, ByteBuffAllocator.HEAP);
|
||||
|
||||
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
|
||||
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
|
||||
|
@ -845,8 +850,8 @@ public class TestHFileBlock {
|
|||
.withCompression(Algorithm.NONE)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.withChecksumType(ChecksumType.NULL).build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 0, -1, meta);
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
-1, 0, -1, meta, ByteBuffAllocator.HEAP);
|
||||
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
|
||||
new MultiByteBuff(buf).getClass(), true)
|
||||
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
|
||||
|
@ -869,9 +874,9 @@ public class TestHFileBlock {
|
|||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta);
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta);
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||
ByteBuffer buff1 = ByteBuffer.allocate(length);
|
||||
ByteBuffer buff2 = ByteBuffer.allocate(length);
|
||||
blockWithNextBlockMetadata.serialize(buff1, true);
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -211,8 +212,8 @@ public class TestHFileBlockIndex {
|
|||
.withIncludesTags(useTags)
|
||||
.withCompression(compr)
|
||||
.build();
|
||||
HFileBlock.FSReader blockReader = new HFileBlock.FSReaderImpl(istream, fs.getFileStatus(path)
|
||||
.getLen(), meta);
|
||||
HFileBlock.FSReader blockReader = new HFileBlock.FSReaderImpl(istream,
|
||||
fs.getFileStatus(path).getLen(), meta, ByteBuffAllocator.HEAP);
|
||||
|
||||
BlockReaderWrapper brw = new BlockReaderWrapper(blockReader);
|
||||
HFileBlockIndex.BlockIndexReader indexReader =
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
|
@ -131,9 +132,8 @@ public class TestHFileDataBlockEncoder {
|
|||
.withBlockSize(0)
|
||||
.withChecksumType(ChecksumType.NULL)
|
||||
.build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, 0,
|
||||
0, -1, hfileContext);
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
|
||||
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
|
||||
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ public class TestHFileDataBlockEncoder {
|
|||
.build();
|
||||
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, 0,
|
||||
0, -1, meta);
|
||||
0, -1, meta, ByteBuffAllocator.HEAP);
|
||||
return b;
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,7 @@ public class TestHFileDataBlockEncoder {
|
|||
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
|
||||
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
|
||||
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
|
||||
block.getHFileContext());
|
||||
block.getHFileContext(), ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
|
@ -152,7 +153,8 @@ public class TestHFileEncryption {
|
|||
}
|
||||
FSDataInputStream is = fs.open(path);
|
||||
try {
|
||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, fileContext);
|
||||
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, fileContext,
|
||||
ByteBuffAllocator.HEAP);
|
||||
long pos = 0;
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]);
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
|
@ -181,7 +182,7 @@ public class TestHFileWriterV3 {
|
|||
.withIncludesTags(useTags)
|
||||
.withHBaseCheckSum(true).build();
|
||||
HFileBlock.FSReader blockReader =
|
||||
new HFileBlock.FSReaderImpl(fsdis, fileSize, meta);
|
||||
new HFileBlock.FSReaderImpl(fsdis, fileSize, meta, ByteBuffAllocator.HEAP);
|
||||
// Comparator class name is stored in the trailer in version 3.
|
||||
CellComparator comparator = trailer.createComparator();
|
||||
HFileBlockIndex.BlockIndexReader dataBlockIndexReader =
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
|
@ -813,10 +814,11 @@ public class TestLruBlockCache {
|
|||
byte[] byteArr = new byte[length];
|
||||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
ByteBuffAllocator alloc = ByteBuffAllocator.HEAP;
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta);
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta);
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
|
||||
|
||||
LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
|
||||
(int)Math.ceil(1.2*maxSize/blockSize),
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -73,12 +74,12 @@ public class TestPrefetch {
|
|||
|
||||
@Test
|
||||
public void testPrefetchSetInHCDWorks() {
|
||||
ColumnFamilyDescriptor columnFamilyDescriptor =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
|
||||
.build();
|
||||
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
|
||||
.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true).build();
|
||||
Configuration c = HBaseConfiguration.create();
|
||||
assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
|
||||
CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache);
|
||||
CacheConfig cc =
|
||||
new CacheConfig(c, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
|
||||
assertTrue(cc.shouldPrefetchOnOpen());
|
||||
}
|
||||
|
||||
|
@ -129,9 +130,8 @@ public class TestPrefetch {
|
|||
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
|
||||
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
|
||||
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
|
||||
if (block.getBlockType() == BlockType.DATA ||
|
||||
block.getBlockType() == BlockType.ROOT_INDEX ||
|
||||
block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
|
||||
if (block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|
||||
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX) {
|
||||
assertTrue(isCached);
|
||||
}
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
|
|
|
@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
|
||||
|
@ -48,8 +49,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.After;
|
||||
|
@ -112,8 +114,7 @@ public class TestBucketCache {
|
|||
private static class MockedBucketCache extends BucketCache {
|
||||
|
||||
public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||
int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException,
|
||||
IOException {
|
||||
int writerThreads, int writerQLen, String persistencePath) throws IOException {
|
||||
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
|
||||
persistencePath);
|
||||
super.wait_when_cache = true;
|
||||
|
@ -131,10 +132,9 @@ public class TestBucketCache {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws FileNotFoundException, IOException {
|
||||
cache =
|
||||
new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||
public void setup() throws IOException {
|
||||
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -424,10 +424,11 @@ public class TestBucketCache {
|
|||
byte[] byteArr = new byte[length];
|
||||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta);
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta);
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
|
||||
|
||||
BlockCacheKey key = new BlockCacheKey("key1", 0);
|
||||
ByteBuffer actualBuffer = ByteBuffer.allocate(length);
|
||||
|
@ -441,22 +442,74 @@ public class TestBucketCache {
|
|||
block1Buffer);
|
||||
|
||||
waitUntilFlushedToBucket(cache, key);
|
||||
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
|
||||
// Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
|
||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
|
||||
block1Buffer);
|
||||
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
|
||||
// Clear and add blockWithoutNextBlockMetadata
|
||||
cache.evictBlock(key);
|
||||
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
|
||||
assertNull(cache.getBlock(key, false, false, false));
|
||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
|
||||
block2Buffer);
|
||||
|
||||
waitUntilFlushedToBucket(cache, key);
|
||||
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
|
||||
// Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
|
||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
|
||||
block1Buffer);
|
||||
|
||||
waitUntilFlushedToBucket(cache, key);
|
||||
assertEquals(1, blockWithNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
assertEquals(1, blockWithoutNextBlockMetadata.getBufferReadOnly().refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRAMCache() {
|
||||
int size = 100;
|
||||
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
||||
byte[] byteArr = new byte[length];
|
||||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
|
||||
RAMCache cache = new RAMCache();
|
||||
BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
|
||||
BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
|
||||
HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
-1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
-1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
|
||||
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
|
||||
|
||||
assertFalse(cache.containsKey(key1));
|
||||
assertNull(cache.putIfAbsent(key1, re1));
|
||||
assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
|
||||
|
||||
assertNotNull(cache.putIfAbsent(key1, re2));
|
||||
assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
|
||||
assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
|
||||
|
||||
assertNull(cache.putIfAbsent(key2, re2));
|
||||
assertEquals(2, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
|
||||
assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
|
||||
|
||||
cache.remove(key1);
|
||||
assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
|
||||
assertEquals(2, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
|
||||
|
||||
cache.clear();
|
||||
assertEquals(1, ((HFileBlock) re1.getData()).getBufferReadOnly().refCnt());
|
||||
assertEquals(1, ((HFileBlock) re2.getData()).getBufferReadOnly().refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -467,7 +520,7 @@ public class TestBucketCache {
|
|||
ByteBuffer buf = ByteBuffer.allocate(length);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
offset, 52, -1, meta);
|
||||
offset, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
|
||||
// initialize an mocked ioengine.
|
||||
IOEngine ioEngine = Mockito.mock(IOEngine.class);
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -214,7 +215,7 @@ public class TestSecureBulkLoadManager {
|
|||
ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
|
||||
Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||
|
||||
CacheConfig writerCacheConf = new CacheConfig(conf, family, null);
|
||||
CacheConfig writerCacheConf = new CacheConfig(conf, family, null, ByteBuffAllocator.HEAP);
|
||||
writerCacheConf.setCacheDataOnWrite(false);
|
||||
HFileContext hFileContext = new HFileContextBuilder()
|
||||
.withIncludesMvcc(false)
|
||||
|
|
Loading…
Reference in New Issue