HBASE-22127 Ensure that the block cached in the LRUBlockCache offheap is allocated from heap
This commit is contained in:
parent
0d294d23a9
commit
081dc03e12
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import sun.nio.ch.DirectBuffer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -34,7 +35,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
|
@ -191,7 +191,7 @@ public class ByteBuffAllocator {
|
|||
}
|
||||
// If disabled the reservoir, just allocate it from on-heap.
|
||||
if (!isReservoirEnabled() || size == 0) {
|
||||
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
|
||||
return allocateOnHeap(size);
|
||||
}
|
||||
int reminder = size % bufSize;
|
||||
int len = size / bufSize + (reminder > 0 ? 1 : 0);
|
||||
|
@ -222,6 +222,22 @@ public class ByteBuffAllocator {
|
|||
return bb;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free all direct buffers if allocated, mainly used for testing.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void clean() {
|
||||
while (!buffers.isEmpty()) {
|
||||
ByteBuffer b = buffers.poll();
|
||||
if (b instanceof DirectBuffer) {
|
||||
DirectBuffer db = (DirectBuffer) b;
|
||||
if (db.cleaner() != null) {
|
||||
db.cleaner().clean();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
|
||||
if (buffers == null || buffers.length == 0) {
|
||||
throw new IllegalArgumentException("buffers shouldn't be null or empty");
|
||||
|
|
|
@ -367,6 +367,10 @@ public class CacheConfig {
|
|||
return Optional.ofNullable(this.blockCache);
|
||||
}
|
||||
|
||||
public boolean isCombinedBlockCache() {
|
||||
return blockCache instanceof CombinedBlockCache;
|
||||
}
|
||||
|
||||
public ByteBuffAllocator getByteBuffAllocator() {
|
||||
return this.byteBuffAllocator;
|
||||
}
|
||||
|
|
|
@ -759,6 +759,13 @@ public class HFileBlock implements Cacheable {
|
|||
return ClassSize.align(size);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
|
||||
*/
|
||||
boolean isOnHeap() {
|
||||
return buf.hasArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unified version 2 {@link HFile} block writer. The intended usage pattern
|
||||
* is as follows:
|
||||
|
@ -1297,16 +1304,29 @@ public class HFileBlock implements Cacheable {
|
|||
/** An HFile block reader with iteration ability. */
|
||||
interface FSReader {
|
||||
/**
|
||||
* Reads the block at the given offset in the file with the given on-disk
|
||||
* size and uncompressed size.
|
||||
*
|
||||
* @param offset
|
||||
* @param onDiskSize the on-disk size of the entire block, including all
|
||||
* applicable headers, or -1 if unknown
|
||||
* Reads the block at the given offset in the file with the given on-disk size and uncompressed
|
||||
* size.
|
||||
* @param offset of the file to read
|
||||
* @param onDiskSize the on-disk size of the entire block, including all applicable headers, or
|
||||
* -1 if unknown
|
||||
* @param pread true to use pread, otherwise use the stream read.
|
||||
* @param updateMetrics update the metrics or not.
|
||||
* @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For
|
||||
* LRUBlockCache, we must ensure that the block to cache is an heap one, because the
|
||||
* memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use
|
||||
* the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or
|
||||
* MetaBlock for faster access. So introduce an flag here to decide whether allocate
|
||||
* from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when
|
||||
* using LRUBlockCache. For most cases, we known what's the expected block type we'll
|
||||
* read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we
|
||||
* cannot pre-decide what's the expected block type, then we can only allocate block's
|
||||
* ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in
|
||||
* {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not
|
||||
* then we'll clone it to an heap one and cache it.
|
||||
* @return the newly read block
|
||||
*/
|
||||
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
|
||||
throws IOException;
|
||||
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
|
||||
boolean intoHeap) throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a block iterator over the given portion of the {@link HFile}.
|
||||
|
@ -1441,7 +1461,7 @@ public class HFileBlock implements Cacheable {
|
|||
if (offset >= endOffset) {
|
||||
return null;
|
||||
}
|
||||
HFileBlock b = readBlockData(offset, length, false, false);
|
||||
HFileBlock b = readBlockData(offset, length, false, false, true);
|
||||
offset += b.getOnDiskSizeWithHeader();
|
||||
length = b.getNextBlockOnDiskSize();
|
||||
HFileBlock uncompressed = b.unpack(fileContext, owner);
|
||||
|
@ -1523,16 +1543,18 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
|
||||
* little memory allocation as possible, using the provided on-disk size.
|
||||
*
|
||||
* @param offset the offset in the stream to read at
|
||||
* @param onDiskSizeWithHeaderL the on-disk size of the block, including
|
||||
* the header, or -1 if unknown; i.e. when iterating over blocks reading
|
||||
* in the file metadata info.
|
||||
* @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
|
||||
* unknown; i.e. when iterating over blocks reading in the file metadata info.
|
||||
* @param pread whether to use a positional read
|
||||
* @param updateMetrics whether to update the metrics
|
||||
* @param intoHeap allocate ByteBuff of block from heap or off-heap.
|
||||
* @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
|
||||
* useHeap.
|
||||
*/
|
||||
@Override
|
||||
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
|
||||
boolean updateMetrics) throws IOException {
|
||||
boolean updateMetrics, boolean intoHeap) throws IOException {
|
||||
// Get a copy of the current state of whether to validate
|
||||
// hbase checksums or not for this read call. This is not
|
||||
// thread-safe but the one constaint is that if we decide
|
||||
|
@ -1541,9 +1563,8 @@ public class HFileBlock implements Cacheable {
|
|||
boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
|
||||
FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
|
||||
|
||||
HFileBlock blk = readBlockDataInternal(is, offset,
|
||||
onDiskSizeWithHeaderL, pread,
|
||||
doVerificationThruHBaseChecksum, updateMetrics);
|
||||
HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
|
||||
doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
|
||||
if (blk == null) {
|
||||
HFile.LOG.warn("HBase checksum verification failed for file " +
|
||||
pathName + " at offset " +
|
||||
|
@ -1570,7 +1591,7 @@ public class HFileBlock implements Cacheable {
|
|||
is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
|
||||
doVerificationThruHBaseChecksum = false;
|
||||
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
|
||||
doVerificationThruHBaseChecksum, updateMetrics);
|
||||
doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
|
||||
if (blk != null) {
|
||||
HFile.LOG.warn("HDFS checksum verification succeeded for file " +
|
||||
pathName + " at offset " +
|
||||
|
@ -1666,24 +1687,29 @@ public class HFileBlock implements Cacheable {
|
|||
return nextBlockOnDiskSize;
|
||||
}
|
||||
|
||||
private ByteBuff allocate(int size, boolean intoHeap) {
|
||||
return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a version 2 block.
|
||||
*
|
||||
* @param offset the offset in the stream to read at.
|
||||
* @param onDiskSizeWithHeaderL the on-disk size of the block, including
|
||||
* the header and checksums if present or -1 if unknown (as a long). Can be -1
|
||||
* if we are doing raw iteration of blocks as when loading up file metadata; i.e.
|
||||
* the first read of a new file. Usually non-null gotten from the file index.
|
||||
* @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
|
||||
* checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw
|
||||
* iteration of blocks as when loading up file metadata; i.e. the first read of a new
|
||||
* file. Usually non-null gotten from the file index.
|
||||
* @param pread whether to use a positional read
|
||||
* @param verifyChecksum Whether to use HBase checksums.
|
||||
* If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off
|
||||
* reading same file if we hit a troublesome patch in an hfile.
|
||||
* @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then
|
||||
* use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome
|
||||
* patch in an hfile.
|
||||
* @param updateMetrics whether need to update the metrics.
|
||||
* @param intoHeap allocate the ByteBuff of block from heap or off-heap.
|
||||
* @return the HFileBlock or null if there is a HBase checksum mismatch
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
|
||||
throws IOException {
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
|
||||
boolean intoHeap) throws IOException {
|
||||
if (offset < 0) {
|
||||
throw new IOException("Invalid offset=" + offset + " trying to read "
|
||||
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
|
||||
|
@ -1725,7 +1751,7 @@ public class HFileBlock implements Cacheable {
|
|||
// says where to start reading. If we have the header cached, then we don't need to read
|
||||
// it again and we can likely read from last place we left off w/o need to backup and reread
|
||||
// the header we read last time through here.
|
||||
ByteBuff onDiskBlock = allocator.allocate(onDiskSizeWithHeader + hdrSize);
|
||||
ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
|
||||
boolean initHFileBlockSuccess = false;
|
||||
try {
|
||||
if (headerBuf != null) {
|
||||
|
@ -2069,7 +2095,7 @@ public class HFileBlock implements Cacheable {
|
|||
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
|
||||
}
|
||||
|
||||
public HFileBlock deepClone() {
|
||||
public HFileBlock deepCloneOnHeap() {
|
||||
return new HFileBlock(this, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -272,8 +272,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
|
||||
}
|
||||
// TODO: Could we use block iterator in here? Would that get stuff into the cache?
|
||||
HFileBlock prevBlock = null;
|
||||
// Don't use BlockIterator here, because it's designed to read load-on-open section.
|
||||
long onDiskSizeOfNextBlock = -1;
|
||||
while (offset < end) {
|
||||
if (Thread.interrupted()) {
|
||||
break;
|
||||
|
@ -282,16 +282,17 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
// the internal-to-hfileblock thread local which holds the overread that gets the
|
||||
// next header, will not have happened...so, pass in the onDiskSize gotten from the
|
||||
// cached block. This 'optimization' triggers extremely rarely I'd say.
|
||||
long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1;
|
||||
HFileBlock block = readBlock(offset, onDiskSize, /*cacheBlock=*/true,
|
||||
/*pread=*/true, false, false, null, null);
|
||||
// Need not update the current block. Ideally here the readBlock won't find the
|
||||
// block in cache. We call this readBlock so that block data is read from FS and
|
||||
// cached in BC. So there is no reference count increment that happens here.
|
||||
// The return will ideally be a noop because the block is not of MemoryType SHARED.
|
||||
returnBlock(block);
|
||||
prevBlock = block;
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
|
||||
/* pread= */true, false, false, null, null);
|
||||
try {
|
||||
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
} finally {
|
||||
// Ideally here the readBlock won't find the block in cache. We call this
|
||||
// readBlock so that block data is read from FS and cached in BC. we must call
|
||||
// returnBlock here to decrease the reference count of block.
|
||||
returnBlock(block);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// IOExceptions are probably due to region closes (relocation, etc.)
|
||||
|
@ -1419,7 +1420,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
// Cache Miss, please load.
|
||||
|
||||
HFileBlock compressedBlock =
|
||||
fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false);
|
||||
fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false, true);
|
||||
HFileBlock uncompressedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
|
||||
if (compressedBlock != uncompressedBlock) {
|
||||
compressedBlock.release();
|
||||
|
@ -1434,6 +1435,24 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If expected block is data block, we'll allocate the ByteBuff of block from
|
||||
* {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} and it's usually an off-heap one,
|
||||
* otherwise it will allocate from heap.
|
||||
* @see org.apache.hadoop.hbase.io.hfile.HFileBlock.FSReader#readBlockData(long, long, boolean,
|
||||
* boolean, boolean)
|
||||
*/
|
||||
private boolean shouldUseHeap(BlockType expectedBlockType) {
|
||||
if (cacheConf.getBlockCache() == null) {
|
||||
return false;
|
||||
} else if (!cacheConf.isCombinedBlockCache()) {
|
||||
// Block to cache in LruBlockCache must be an heap one. So just allocate block memory from
|
||||
// heap for saving an extra off-heap to heap copying.
|
||||
return true;
|
||||
}
|
||||
return expectedBlockType != null && !expectedBlockType.isData();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
|
||||
final boolean cacheBlock, boolean pread, final boolean isCompaction,
|
||||
|
@ -1505,8 +1524,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
TraceUtil.addTimelineAnnotation("blockCacheMiss");
|
||||
// Load block from filesystem.
|
||||
HFileBlock hfileBlock =
|
||||
fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction);
|
||||
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
|
||||
!isCompaction, shouldUseHeap(expectedBlockType));
|
||||
validateBlockType(hfileBlock, expectedBlockType);
|
||||
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
|
||||
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
|
||||
|
|
|
@ -354,6 +354,32 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
|
||||
* access will be more faster then off-heap, the small index block or meta block cached in
|
||||
* CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
|
||||
* calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
|
||||
* heap size will be messed up. Here we will clone the block into an heap block if it's an
|
||||
* off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
|
||||
* the block (HBASE-22127): <br>
|
||||
* 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
|
||||
* 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
|
||||
* reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
|
||||
* JVM, so need a retain here.
|
||||
* @param buf the original block
|
||||
* @return an block with an heap memory backend.
|
||||
*/
|
||||
private Cacheable asReferencedHeapBlock(Cacheable buf) {
|
||||
if (buf instanceof HFileBlock) {
|
||||
HFileBlock blk = ((HFileBlock) buf);
|
||||
if (!blk.isOnHeap()) {
|
||||
return blk.deepCloneOnHeap();
|
||||
}
|
||||
}
|
||||
// The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
|
||||
return buf.retain();
|
||||
}
|
||||
|
||||
// BlockCache implementation
|
||||
|
||||
/**
|
||||
|
@ -402,8 +428,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
}
|
||||
return;
|
||||
}
|
||||
// The block will be referenced by the LRUBlockCache, so should increase the refCnt here.
|
||||
buf.retain();
|
||||
// Ensure that the block is an heap one.
|
||||
buf = asReferencedHeapBlock(buf);
|
||||
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
|
||||
long newSize = updateSizeMetrics(cb, false);
|
||||
map.put(cacheKey, cb);
|
||||
|
@ -503,7 +529,7 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
if (caching) {
|
||||
if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
|
||||
Cacheable original = result;
|
||||
result = ((HFileBlock) original).deepClone();
|
||||
result = ((HFileBlock) original).deepCloneOnHeap();
|
||||
// deepClone an new one, so need to put the original one back to free it.
|
||||
victimHandler.returnBlock(cacheKey, original);
|
||||
}
|
||||
|
|
|
@ -172,7 +172,7 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
|
|||
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
if ((value != null) && caching) {
|
||||
if ((value instanceof HFileBlock) && ((HFileBlock) value).usesSharedMemory()) {
|
||||
value = ((HFileBlock) value).deepClone();
|
||||
value = ((HFileBlock) value).deepCloneOnHeap();
|
||||
}
|
||||
cacheBlock(cacheKey, value);
|
||||
}
|
||||
|
|
|
@ -52,6 +52,8 @@ import java.util.concurrent.atomic.LongAdder;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
|
@ -562,23 +564,54 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return evictBlock(cacheKey, true);
|
||||
}
|
||||
|
||||
private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
|
||||
RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
|
||||
if (removedBlock != null) {
|
||||
this.blockNumber.decrement();
|
||||
this.heapSize.add(-1 * removedBlock.getData().heapSize());
|
||||
// does not check for the ref count. Just tries to evict it if found in the
|
||||
// bucket map
|
||||
private boolean forceEvict(BlockCacheKey cacheKey) {
|
||||
if (!cacheEnabled) {
|
||||
return false;
|
||||
}
|
||||
return removedBlock;
|
||||
boolean existed = removeFromRamCache(cacheKey);
|
||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||
if (bucketEntry == null) {
|
||||
if (existed) {
|
||||
cacheStats.evicted(0, cacheKey.isPrimary());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
|
||||
try {
|
||||
lock.writeLock().lock();
|
||||
if (backingMap.remove(cacheKey, bucketEntry)) {
|
||||
blockEvicted(cacheKey, bucketEntry, !existed);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
||||
return ramCache.remove(cacheKey, re -> {
|
||||
if (re != null) {
|
||||
this.blockNumber.decrement();
|
||||
this.heapSize.add(-1 * re.getData().heapSize());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
|
||||
if (!cacheEnabled) {
|
||||
return false;
|
||||
}
|
||||
RAMQueueEntry removedBlock = checkRamCache(cacheKey);
|
||||
boolean existed = removeFromRamCache(cacheKey);
|
||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||
if (bucketEntry == null) {
|
||||
if (removedBlock != null) {
|
||||
if (existed) {
|
||||
cacheStats.evicted(0, cacheKey.isPrimary());
|
||||
return true;
|
||||
} else {
|
||||
|
@ -591,7 +624,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
int refCount = bucketEntry.getRefCount();
|
||||
if (refCount == 0) {
|
||||
if (backingMap.remove(cacheKey, bucketEntry)) {
|
||||
blockEvicted(cacheKey, bucketEntry, removedBlock == null);
|
||||
blockEvicted(cacheKey, bucketEntry, !existed);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
@ -1015,10 +1048,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
putIntoBackingMap(key, bucketEntries[i]);
|
||||
}
|
||||
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
|
||||
if (ramCacheEntry != null) {
|
||||
heapSize.add(-1 * entries.get(i).getData().heapSize());
|
||||
} else if (bucketEntries[i] != null){
|
||||
boolean existed = ramCache.remove(key, re -> {
|
||||
if (re != null) {
|
||||
heapSize.add(-1 * re.getData().heapSize());
|
||||
}
|
||||
});
|
||||
if (!existed && bucketEntries[i] != null) {
|
||||
// Block should have already been evicted. Remove it and free space.
|
||||
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
|
||||
try {
|
||||
|
@ -1719,12 +1754,23 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return previous;
|
||||
}
|
||||
|
||||
public RAMQueueEntry remove(BlockCacheKey key) {
|
||||
public boolean remove(BlockCacheKey key) {
|
||||
return remove(key, re->{});
|
||||
}
|
||||
|
||||
/**
|
||||
* Defined an {@link Consumer} here, because once the removed entry release its reference count,
|
||||
* then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
|
||||
* exception. the consumer will access entry to remove before release its reference count.
|
||||
* Notice, don't change its reference count in the {@link Consumer}
|
||||
*/
|
||||
public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
|
||||
RAMQueueEntry previous = delegate.remove(key);
|
||||
action.accept(previous);
|
||||
if (previous != null) {
|
||||
previous.getData().release();
|
||||
}
|
||||
return previous;
|
||||
return previous != null;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
|
|
|
@ -100,7 +100,8 @@ public class TestChecksum {
|
|||
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(b.isOnHeap());
|
||||
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
|
||||
}
|
||||
|
||||
|
@ -146,7 +147,8 @@ public class TestChecksum {
|
|||
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(b.isOnHeap());
|
||||
|
||||
// verify SingleByteBuff checksum.
|
||||
verifySBBCheckSum(b.getBufferReadOnly());
|
||||
|
@ -215,7 +217,7 @@ public class TestChecksum {
|
|||
.withHBaseCheckSum(true)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
b.sanityCheck();
|
||||
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
|
||||
assertEquals(algo == GZ ? 2173 : 4936,
|
||||
|
@ -236,19 +238,19 @@ public class TestChecksum {
|
|||
// requests. Verify that this is correct.
|
||||
for (int i = 0; i <
|
||||
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
|
||||
b = hbr.readBlockData(0, -1, pread, false);
|
||||
b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
}
|
||||
// The next read should have hbase checksum verification reanabled,
|
||||
// we verify this by assertng that there was a hbase-checksum failure.
|
||||
b = hbr.readBlockData(0, -1, pread, false);
|
||||
b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
|
||||
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
|
||||
|
||||
// Since the above encountered a checksum failure, we switch
|
||||
// back to not checking hbase checksums.
|
||||
b = hbr.readBlockData(0, -1, pread, false);
|
||||
b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
is.close();
|
||||
|
@ -260,7 +262,7 @@ public class TestChecksum {
|
|||
assertEquals(false, newfs.useHBaseChecksum());
|
||||
is = new FSDataInputStreamWrapper(newfs, path);
|
||||
hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
|
||||
b = hbr.readBlockData(0, -1, pread, false);
|
||||
b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
is.close();
|
||||
b.sanityCheck();
|
||||
b = b.unpack(meta, hbr);
|
||||
|
@ -343,7 +345,7 @@ public class TestChecksum {
|
|||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, nochecksum), totalSize,
|
||||
hfs, path, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
|
||||
is.close();
|
||||
b.sanityCheck();
|
||||
|
@ -389,13 +391,13 @@ public class TestChecksum {
|
|||
|
||||
@Override
|
||||
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
|
||||
throws IOException {
|
||||
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
|
||||
boolean useHeap) throws IOException {
|
||||
if (verifyChecksum) {
|
||||
corruptDataStream = true;
|
||||
}
|
||||
HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
|
||||
verifyChecksum, updateMetrics);
|
||||
verifyChecksum, updateMetrics, useHeap);
|
||||
corruptDataStream = false;
|
||||
return b;
|
||||
}
|
||||
|
|
|
@ -17,8 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
@ -104,33 +108,129 @@ public class TestHFile {
|
|||
fs = TEST_UTIL.getTestFileSystem();
|
||||
}
|
||||
|
||||
private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount,
|
||||
int minAllocSize) {
|
||||
Configuration that = HBaseConfiguration.create(conf);
|
||||
that.setInt(BUFFER_SIZE_KEY, bufSize);
|
||||
that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
|
||||
// All ByteBuffers will be allocated from the buffers.
|
||||
that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize);
|
||||
return ByteBuffAllocator.create(that, reservoirEnabled);
|
||||
}
|
||||
|
||||
private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) {
|
||||
// Fill the allocator with bufCount ByteBuffer
|
||||
List<ByteBuff> buffs = new ArrayList<>();
|
||||
for (int i = 0; i < bufCount; i++) {
|
||||
buffs.add(alloc.allocateOneBuffer());
|
||||
Assert.assertEquals(alloc.getQueueSize(), 0);
|
||||
}
|
||||
buffs.forEach(ByteBuff::release);
|
||||
Assert.assertEquals(alloc.getQueueSize(), bufCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReaderWithoutBlockCache() throws Exception {
|
||||
int bufCount = 32;
|
||||
Configuration that = HBaseConfiguration.create(conf);
|
||||
that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
|
||||
// AllByteBuffers will be allocated from the buffers.
|
||||
that.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
|
||||
ByteBuffAllocator alloc = ByteBuffAllocator.create(that, true);
|
||||
List<ByteBuff> buffs = new ArrayList<>();
|
||||
// Fill the allocator with bufCount ByteBuffer
|
||||
for (int i = 0; i < bufCount; i++) {
|
||||
buffs.add(alloc.allocateOneBuffer());
|
||||
}
|
||||
Assert.assertEquals(alloc.getQueueSize(), 0);
|
||||
for (ByteBuff buf : buffs) {
|
||||
buf.release();
|
||||
}
|
||||
Assert.assertEquals(alloc.getQueueSize(), bufCount);
|
||||
ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0);
|
||||
fillByteBuffAllocator(alloc, bufCount);
|
||||
// start write to store file.
|
||||
Path path = writeStoreFile();
|
||||
try {
|
||||
readStoreFile(path, that, alloc);
|
||||
readStoreFile(path, conf, alloc);
|
||||
} catch (Exception e) {
|
||||
// fail test
|
||||
assertTrue(false);
|
||||
}
|
||||
Assert.assertEquals(bufCount, alloc.getQueueSize());
|
||||
alloc.clean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for HBASE-22127 in LruBlockCache.
|
||||
*/
|
||||
@Test
|
||||
public void testReaderWithLRUBlockCache() throws Exception {
|
||||
int bufCount = 1024, blockSize = 64 * 1024;
|
||||
ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
|
||||
fillByteBuffAllocator(alloc, bufCount);
|
||||
Path storeFilePath = writeStoreFile();
|
||||
// Open the file reader with LRUBlockCache
|
||||
BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc);
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
|
||||
long offset = 0;
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
|
||||
HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
// Ensure the block is an heap one.
|
||||
Cacheable cachedBlock = lru.getBlock(key, false, false, true);
|
||||
Assert.assertNotNull(cachedBlock);
|
||||
Assert.assertTrue(cachedBlock instanceof HFileBlock);
|
||||
Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
|
||||
// Should never allocate off-heap block from allocator because ensure that it's LRU.
|
||||
Assert.assertEquals(bufCount, alloc.getQueueSize());
|
||||
block.release(); // return back the ByteBuffer back to allocator.
|
||||
}
|
||||
reader.close();
|
||||
Assert.assertEquals(bufCount, alloc.getQueueSize());
|
||||
alloc.clean();
|
||||
lru.shutdown();
|
||||
}
|
||||
|
||||
private BlockCache initCombinedBlockCache() {
|
||||
Configuration that = HBaseConfiguration.create(conf);
|
||||
that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
|
||||
that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||
BlockCache bc = BlockCacheFactory.createBlockCache(that);
|
||||
Assert.assertNotNull(bc);
|
||||
Assert.assertTrue(bc instanceof CombinedBlockCache);
|
||||
return bc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for HBASE-22127 in CombinedBlockCache
|
||||
*/
|
||||
@Test
|
||||
public void testReaderWithCombinedBlockCache() throws Exception {
|
||||
int bufCount = 1024, blockSize = 64 * 1024;
|
||||
ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
|
||||
fillByteBuffAllocator(alloc, bufCount);
|
||||
Path storeFilePath = writeStoreFile();
|
||||
// Open the file reader with CombinedBlockCache
|
||||
BlockCache combined = initCombinedBlockCache();
|
||||
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
|
||||
long offset = 0;
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
|
||||
HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
// Read the cached block.
|
||||
Cacheable cachedBlock = combined.getBlock(key, false, false, true);
|
||||
try {
|
||||
Assert.assertNotNull(cachedBlock);
|
||||
Assert.assertTrue(cachedBlock instanceof HFileBlock);
|
||||
HFileBlock hfb = (HFileBlock) cachedBlock;
|
||||
// Data block will be cached in BucketCache, so it should be an off-heap block.
|
||||
if (hfb.getBlockType().isData()) {
|
||||
Assert.assertFalse(hfb.isOnHeap());
|
||||
} else {
|
||||
// Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
|
||||
Assert.assertTrue(hfb.isOnHeap());
|
||||
}
|
||||
} finally {
|
||||
combined.returnBlock(key, cachedBlock);
|
||||
}
|
||||
block.release(); // return back the ByteBuffer back to allocator.
|
||||
}
|
||||
reader.close();
|
||||
combined.shutdown();
|
||||
Assert.assertEquals(bufCount, alloc.getQueueSize());
|
||||
alloc.clean();
|
||||
}
|
||||
|
||||
private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc)
|
||||
|
|
|
@ -40,6 +40,8 @@ import java.util.concurrent.Executor;
|
|||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
|
|||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -68,6 +71,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
|
|||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.io.compress.Compressor;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
@ -93,10 +97,12 @@ public class TestHFileBlock {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
|
||||
|
||||
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
|
||||
// TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet.
|
||||
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ };
|
||||
|
||||
private static final int NUM_TEST_BLOCKS = 1000;
|
||||
private static final int NUM_READER_THREADS = 26;
|
||||
private static final int MAX_BUFFER_COUNT = 2048;
|
||||
|
||||
// Used to generate KeyValues
|
||||
private static int NUM_KEYVALUES = 50;
|
||||
|
@ -108,14 +114,51 @@ public class TestHFileBlock {
|
|||
|
||||
private final boolean includesMemstoreTS;
|
||||
private final boolean includesTag;
|
||||
public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
|
||||
private final boolean useHeapAllocator;
|
||||
private final ByteBuffAllocator alloc;
|
||||
|
||||
public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) {
|
||||
this.includesMemstoreTS = includesMemstoreTS;
|
||||
this.includesTag = includesTag;
|
||||
this.useHeapAllocator = useHeapAllocator;
|
||||
this.alloc = useHeapAllocator ? ByteBuffAllocator.HEAP : createOffHeapAlloc();
|
||||
assertAllocator();
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> parameters() {
|
||||
return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
// Generate boolean triples from 000 to 111
|
||||
for (int i = 0; i < (1 << 3); i++) {
|
||||
Object[] flags = new Boolean[3];
|
||||
for (int k = 0; k < 3; k++) {
|
||||
flags[k] = (i & (1 << k)) != 0;
|
||||
}
|
||||
params.add(flags);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
private ByteBuffAllocator createOffHeapAlloc() {
|
||||
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
|
||||
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
|
||||
ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
|
||||
// Fill the allocator
|
||||
List<ByteBuff> bufs = new ArrayList<>();
|
||||
for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
|
||||
ByteBuff bb = alloc.allocateOneBuffer();
|
||||
assertTrue(!bb.hasArray());
|
||||
bufs.add(bb);
|
||||
}
|
||||
bufs.forEach(ByteBuff::release);
|
||||
return alloc;
|
||||
}
|
||||
|
||||
private void assertAllocator() {
|
||||
if (!useHeapAllocator) {
|
||||
assertEquals(MAX_BUFFER_COUNT, alloc.getQueueSize());
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -123,6 +166,12 @@ public class TestHFileBlock {
|
|||
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
assertAllocator();
|
||||
alloc.clean();
|
||||
}
|
||||
|
||||
static void writeTestBlockContents(DataOutputStream dos) throws IOException {
|
||||
// This compresses really well.
|
||||
for (int i = 0; i < 1000; ++i)
|
||||
|
@ -327,9 +376,8 @@ public class TestHFileBlock {
|
|||
.withIncludesMvcc(includesMemstoreTS)
|
||||
.withIncludesTags(includesTag)
|
||||
.withCompression(algo).build();
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
|
||||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
|
||||
is.close();
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
|
||||
|
@ -341,14 +389,14 @@ public class TestHFileBlock {
|
|||
|
||||
if (algo == GZ) {
|
||||
is = fs.open(path);
|
||||
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
|
||||
b.totalChecksumBytes(), pread, false);
|
||||
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
|
||||
b = hbr.readBlockData(0,
|
||||
2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
|
||||
assertEquals(expected, b);
|
||||
int wrongCompressedSize = 2172;
|
||||
try {
|
||||
b = hbr.readBlockData(0, wrongCompressedSize
|
||||
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
|
||||
hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
|
||||
false, true);
|
||||
fail("Exception expected");
|
||||
} catch (IOException ex) {
|
||||
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
|
||||
|
@ -356,8 +404,10 @@ public class TestHFileBlock {
|
|||
+ "'.\nMessage is expected to start with: '" + expectedPrefix
|
||||
+ "'", ex.getMessage().startsWith(expectedPrefix));
|
||||
}
|
||||
assertTrue(b.release());
|
||||
is.close();
|
||||
}
|
||||
assertTrue(expected.release());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -428,13 +478,13 @@ public class TestHFileBlock {
|
|||
.withIncludesTags(includesTag)
|
||||
.build();
|
||||
HFileBlock.FSReaderImpl hbr =
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
|
||||
hbr.setDataBlockEncoder(dataBlockEncoder);
|
||||
hbr.setIncludesMemStoreTS(includesMemstoreTS);
|
||||
HFileBlock blockFromHFile, blockUnpacked;
|
||||
int pos = 0;
|
||||
for (int blockId = 0; blockId < numBlocks; ++blockId) {
|
||||
blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
|
||||
blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
blockFromHFile.sanityCheck();
|
||||
pos += blockFromHFile.getOnDiskSizeWithHeader();
|
||||
|
@ -487,6 +537,10 @@ public class TestHFileBlock {
|
|||
blockUnpacked, deserialized.unpack(meta, hbr));
|
||||
}
|
||||
}
|
||||
assertTrue(blockUnpacked.release());
|
||||
if (blockFromHFile != blockUnpacked) {
|
||||
blockFromHFile.release();
|
||||
}
|
||||
}
|
||||
is.close();
|
||||
}
|
||||
|
@ -557,7 +611,7 @@ public class TestHFileBlock {
|
|||
.withIncludesTags(includesTag)
|
||||
.withCompression(algo).build();
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
|
||||
new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
|
||||
long curOffset = 0;
|
||||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
if (!pread) {
|
||||
|
@ -569,7 +623,7 @@ public class TestHFileBlock {
|
|||
if (detailedLogging) {
|
||||
LOG.info("Reading block #" + i + " at offset " + curOffset);
|
||||
}
|
||||
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
|
||||
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true);
|
||||
if (detailedLogging) {
|
||||
LOG.info("Block #" + i + ": " + b);
|
||||
}
|
||||
|
@ -583,7 +637,8 @@ public class TestHFileBlock {
|
|||
|
||||
// Now re-load this block knowing the on-disk size. This tests a
|
||||
// different branch in the loader.
|
||||
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
|
||||
HFileBlock b2 =
|
||||
hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true);
|
||||
b2.sanityCheck();
|
||||
|
||||
assertEquals(b.getBlockType(), b2.getBlockType());
|
||||
|
@ -599,6 +654,7 @@ public class TestHFileBlock {
|
|||
assertEquals(b.getOnDiskDataSizeWithHeader(),
|
||||
b2.getOnDiskDataSizeWithHeader());
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
assertTrue(b2.release());
|
||||
|
||||
curOffset += b.getOnDiskSizeWithHeader();
|
||||
|
||||
|
@ -606,14 +662,14 @@ public class TestHFileBlock {
|
|||
// NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
|
||||
// verifies that the unpacked value read back off disk matches the unpacked value
|
||||
// generated before writing to disk.
|
||||
b = b.unpack(meta, hbr);
|
||||
HFileBlock newBlock = b.unpack(meta, hbr);
|
||||
// b's buffer has header + data + checksum while
|
||||
// expectedContents have header + data only
|
||||
ByteBuff bufRead = b.getBufferReadOnly();
|
||||
ByteBuff bufRead = newBlock.getBufferReadOnly();
|
||||
ByteBuffer bufExpected = expectedContents.get(i);
|
||||
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
|
||||
bufRead.arrayOffset(),
|
||||
bufRead.limit() - b.totalChecksumBytes(),
|
||||
bufRead.limit() - newBlock.totalChecksumBytes(),
|
||||
bufExpected.array(), bufExpected.arrayOffset(),
|
||||
bufExpected.limit()) == 0;
|
||||
String wrongBytesMsg = "";
|
||||
|
@ -642,9 +698,12 @@ public class TestHFileBlock {
|
|||
}
|
||||
}
|
||||
assertTrue(wrongBytesMsg, bytesAreCorrect);
|
||||
assertTrue(newBlock.release());
|
||||
if (newBlock != b) {
|
||||
assertTrue(b.release());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(curOffset, fs.getFileStatus(path).getLen());
|
||||
is.close();
|
||||
}
|
||||
|
@ -687,29 +746,37 @@ public class TestHFileBlock {
|
|||
boolean pread = true;
|
||||
boolean withOnDiskSize = rand.nextBoolean();
|
||||
long expectedSize =
|
||||
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize
|
||||
: offsets.get(blockId + 1)) - offset;
|
||||
|
||||
HFileBlock b;
|
||||
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset;
|
||||
HFileBlock b = null;
|
||||
try {
|
||||
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
|
||||
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
|
||||
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
|
||||
if (useHeapAllocator) {
|
||||
assertTrue(b.isOnHeap());
|
||||
} else {
|
||||
assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
|
||||
}
|
||||
assertEquals(types.get(blockId), b.getBlockType());
|
||||
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
|
||||
assertEquals(offset, b.getOffset());
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error in client " + clientId + " trying to read block at "
|
||||
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
|
||||
withOnDiskSize, ex);
|
||||
LOG.error("Error in client " + clientId + " trying to read block at " + offset
|
||||
+ ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize,
|
||||
ex);
|
||||
return false;
|
||||
} finally {
|
||||
if (b != null) {
|
||||
b.release();
|
||||
}
|
||||
}
|
||||
++numBlocksRead;
|
||||
if (pread) {
|
||||
++numPositionalRead;
|
||||
}
|
||||
|
||||
assertEquals(types.get(blockId), b.getBlockType());
|
||||
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
|
||||
assertEquals(offset, b.getOffset());
|
||||
|
||||
++numBlocksRead;
|
||||
if (pread)
|
||||
++numPositionalRead;
|
||||
if (withOnDiskSize)
|
||||
if (withOnDiskSize) {
|
||||
++numWithOnDiskSize;
|
||||
}
|
||||
}
|
||||
LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
|
||||
" blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
|
||||
|
@ -717,7 +784,6 @@ public class TestHFileBlock {
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -742,7 +808,7 @@ public class TestHFileBlock {
|
|||
.withCompression(compressAlgo)
|
||||
.build();
|
||||
HFileBlock.FSReader hbr =
|
||||
new HFileBlock.FSReaderImpl(is, fileSize, meta, ByteBuffAllocator.HEAP);
|
||||
new HFileBlock.FSReaderImpl(is, fileSize, meta, alloc);
|
||||
|
||||
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
|
||||
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
|
||||
|
@ -761,7 +827,6 @@ public class TestHFileBlock {
|
|||
+ ")");
|
||||
}
|
||||
}
|
||||
|
||||
is.close();
|
||||
}
|
||||
}
|
||||
|
@ -874,9 +939,9 @@ public class TestHFileBlock {
|
|||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
|
||||
ByteBuffer buff1 = ByteBuffer.allocate(length);
|
||||
ByteBuffer buff2 = ByteBuffer.allocate(length);
|
||||
blockWithNextBlockMetadata.serialize(buff1, true);
|
||||
|
|
|
@ -192,7 +192,7 @@ public class TestHFileBlockIndex {
|
|||
}
|
||||
|
||||
missCount += 1;
|
||||
prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false);
|
||||
prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false, true);
|
||||
prevOffset = offset;
|
||||
prevOnDiskSize = onDiskSize;
|
||||
prevPread = pread;
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TestHFileEncryption {
|
|||
|
||||
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
|
||||
throws IOException {
|
||||
HFileBlock b = hbr.readBlockData(pos, -1, false, false);
|
||||
HFileBlock b = hbr.readBlockData(pos, -1, false, false, true);
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
b.sanityCheck();
|
||||
assertFalse(b.isUnpacked());
|
||||
|
|
|
@ -224,8 +224,8 @@ public class TestHFileWriterV3 {
|
|||
fsdis.seek(0);
|
||||
long curBlockPos = 0;
|
||||
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
|
||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
|
||||
.unpack(context, blockReader);
|
||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
|
||||
.unpack(context, blockReader);
|
||||
assertEquals(BlockType.DATA, block.getBlockType());
|
||||
ByteBuff buf = block.getBufferWithoutHeader();
|
||||
int keyLen = -1;
|
||||
|
@ -285,8 +285,8 @@ public class TestHFileWriterV3 {
|
|||
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
|
||||
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
|
||||
trailer.getLoadOnOpenDataOffset());
|
||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
|
||||
.unpack(context, blockReader);
|
||||
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
|
||||
.unpack(context, blockReader);
|
||||
assertEquals(BlockType.META, block.getBlockType());
|
||||
Text t = new Text();
|
||||
ByteBuff buf = block.getBufferWithoutHeader();
|
||||
|
|
|
@ -160,7 +160,7 @@ public class TestLazyDataBlockDecompression {
|
|||
CacheConfig cc = new CacheConfig(lazyCompressDisabled,
|
||||
new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled));
|
||||
assertFalse(cc.shouldCacheDataCompressed());
|
||||
assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
|
||||
assertFalse(cc.isCombinedBlockCache());
|
||||
LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
|
||||
LOG.info("disabledBlockCache=" + disabledBlockCache);
|
||||
assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
|
||||
|
|
|
@ -60,10 +60,10 @@ public class TestBucketWriterThread {
|
|||
private static class MockBucketCache extends BucketCache {
|
||||
|
||||
public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
|
||||
throws FileNotFoundException, IOException {
|
||||
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
|
||||
throws IOException {
|
||||
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||
persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
|
||||
persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
|
|||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -192,9 +191,7 @@ public abstract class AbstractTestDLS {
|
|||
Path rootdir = FSUtils.getRootDir(conf);
|
||||
|
||||
int numRegions = 50;
|
||||
try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
|
||||
Table t = installTable(zkw, numRegions)) {
|
||||
TableName table = t.getName();
|
||||
try (Table t = installTable(numRegions)) {
|
||||
List<RegionInfo> regions = null;
|
||||
HRegionServer hrs = null;
|
||||
for (int i = 0; i < NUM_RS; i++) {
|
||||
|
@ -224,7 +221,6 @@ public abstract class AbstractTestDLS {
|
|||
|
||||
int count = 0;
|
||||
for (RegionInfo hri : regions) {
|
||||
Path tdir = FSUtils.getWALTableDir(conf, table);
|
||||
@SuppressWarnings("deprecation")
|
||||
Path editsdir = WALSplitUtil
|
||||
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
|
||||
|
@ -266,8 +262,7 @@ public abstract class AbstractTestDLS {
|
|||
// they will consume recovered.edits
|
||||
master.balanceSwitch(false);
|
||||
|
||||
try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
|
||||
Table ht = installTable(zkw, numRegionsToCreate);) {
|
||||
try (Table ht = installTable(numRegionsToCreate)) {
|
||||
HRegionServer hrs = findRSToKill(false);
|
||||
List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
|
||||
makeWAL(hrs, regions, numLogLines, 100);
|
||||
|
@ -329,8 +324,7 @@ public abstract class AbstractTestDLS {
|
|||
final Path logDir = new Path(rootdir,
|
||||
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
|
||||
|
||||
try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
|
||||
Table t = installTable(zkw, 40)) {
|
||||
try (Table t = installTable(40)) {
|
||||
makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
|
||||
|
||||
new Thread() {
|
||||
|
@ -380,8 +374,7 @@ public abstract class AbstractTestDLS {
|
|||
|
||||
startCluster(NUM_RS); // NUM_RS=6.
|
||||
|
||||
try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null);
|
||||
Table table = installTable(zkw, numRegionsToCreate)) {
|
||||
try (Table table = installTable(numRegionsToCreate)) {
|
||||
populateDataInTable(numRowsPerRegion);
|
||||
|
||||
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
|
||||
|
@ -482,11 +475,11 @@ public abstract class AbstractTestDLS {
|
|||
}
|
||||
}
|
||||
|
||||
private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
|
||||
return installTable(zkw, nrs, 0);
|
||||
private Table installTable(int nrs) throws Exception {
|
||||
return installTable(nrs, 0);
|
||||
}
|
||||
|
||||
private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception {
|
||||
private Table installTable(int nrs, int existingRegions) throws Exception {
|
||||
// Create a table with regions
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
LOG.info("Creating table with " + nrs + " regions");
|
||||
|
@ -497,14 +490,14 @@ public abstract class AbstractTestDLS {
|
|||
}
|
||||
assertEquals(nrs, numRegions);
|
||||
LOG.info("Waiting for no more RIT\n");
|
||||
blockUntilNoRIT(zkw, master);
|
||||
blockUntilNoRIT();
|
||||
// disable-enable cycle to get rid of table's dead regions left behind
|
||||
// by createMultiRegions
|
||||
assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
|
||||
LOG.debug("Disabling table\n");
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
LOG.debug("Waiting for no more RIT\n");
|
||||
blockUntilNoRIT(zkw, master);
|
||||
blockUntilNoRIT();
|
||||
NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
|
||||
LOG.debug("Verifying only catalog and namespace regions are assigned\n");
|
||||
if (regions.size() != 2) {
|
||||
|
@ -515,7 +508,7 @@ public abstract class AbstractTestDLS {
|
|||
LOG.debug("Enabling table\n");
|
||||
TEST_UTIL.getAdmin().enableTable(tableName);
|
||||
LOG.debug("Waiting for no more RIT\n");
|
||||
blockUntilNoRIT(zkw, master);
|
||||
blockUntilNoRIT();
|
||||
LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
|
||||
regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
|
||||
assertEquals(numRegions + 2 + existingRegions, regions.size());
|
||||
|
@ -651,7 +644,7 @@ public abstract class AbstractTestDLS {
|
|||
return count;
|
||||
}
|
||||
|
||||
private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception {
|
||||
private void blockUntilNoRIT() throws Exception {
|
||||
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue