HBASE-22127 Ensure that the block cached in the LRUBlockCache offheap is allocated from heap

This commit is contained in:
huzheng 2019-04-01 22:23:24 +08:00
parent 35b818606f
commit d1eb6171f9
15 changed files with 457 additions and 160 deletions

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import sun.nio.ch.DirectBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@ -34,7 +35,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@ -191,7 +191,7 @@ public class ByteBuffAllocator {
}
// If disabled the reservoir, just allocate it from on-heap.
if (!isReservoirEnabled() || size == 0) {
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
return allocateOnHeap(size);
}
int reminder = size % bufSize;
int len = size / bufSize + (reminder > 0 ? 1 : 0);
@ -222,6 +222,22 @@ public class ByteBuffAllocator {
return bb;
}
/**
* Free all direct buffers if allocated, mainly used for testing.
*/
@VisibleForTesting
public void clean() {
while (!buffers.isEmpty()) {
ByteBuffer b = buffers.poll();
if (b instanceof DirectBuffer) {
DirectBuffer db = (DirectBuffer) b;
if (db.cleaner() != null) {
db.cleaner().clean();
}
}
}
}
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");

View File

@ -367,6 +367,10 @@ public class CacheConfig {
return Optional.ofNullable(this.blockCache);
}
public boolean isCombinedBlockCache() {
return blockCache instanceof CombinedBlockCache;
}
public ByteBuffAllocator getByteBuffAllocator() {
return this.byteBuffAllocator;
}

View File

@ -762,6 +762,13 @@ public class HFileBlock implements Cacheable {
return ClassSize.align(size);
}
/**
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
*/
boolean isOnHeap() {
return buf.hasArray();
}
/**
* Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows:
@ -1300,16 +1307,29 @@ public class HFileBlock implements Cacheable {
/** An HFile block reader with iteration ability. */
interface FSReader {
/**
* Reads the block at the given offset in the file with the given on-disk
* size and uncompressed size.
*
* @param offset
* @param onDiskSize the on-disk size of the entire block, including all
* applicable headers, or -1 if unknown
* Reads the block at the given offset in the file with the given on-disk size and uncompressed
* size.
* @param offset of the file to read
* @param onDiskSize the on-disk size of the entire block, including all applicable headers, or
* -1 if unknown
* @param pread true to use pread, otherwise use the stream read.
* @param updateMetrics update the metrics or not.
* @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For
* LRUBlockCache, we must ensure that the block to cache is an heap one, because the
* memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use
* the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or
* MetaBlock for faster access. So introduce an flag here to decide whether allocate
* from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when
* using LRUBlockCache. For most cases, we known what's the expected block type we'll
* read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we
* cannot pre-decide what's the expected block type, then we can only allocate block's
* ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in
* {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not
* then we'll clone it to an heap one and cache it.
* @return the newly read block
*/
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
throws IOException;
HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
boolean intoHeap) throws IOException;
/**
* Creates a block iterator over the given portion of the {@link HFile}.
@ -1444,7 +1464,7 @@ public class HFileBlock implements Cacheable {
if (offset >= endOffset) {
return null;
}
HFileBlock b = readBlockData(offset, length, false, false);
HFileBlock b = readBlockData(offset, length, false, false, true);
offset += b.getOnDiskSizeWithHeader();
length = b.getNextBlockOnDiskSize();
HFileBlock uncompressed = b.unpack(fileContext, owner);
@ -1526,16 +1546,18 @@ public class HFileBlock implements Cacheable {
/**
* Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
* little memory allocation as possible, using the provided on-disk size.
*
* @param offset the offset in the stream to read at
* @param onDiskSizeWithHeaderL the on-disk size of the block, including
* the header, or -1 if unknown; i.e. when iterating over blocks reading
* in the file metadata info.
* @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
* unknown; i.e. when iterating over blocks reading in the file metadata info.
* @param pread whether to use a positional read
* @param updateMetrics whether to update the metrics
* @param intoHeap allocate ByteBuff of block from heap or off-heap.
* @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
* useHeap.
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
boolean updateMetrics) throws IOException {
boolean updateMetrics, boolean intoHeap) throws IOException {
// Get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not
// thread-safe but the one constaint is that if we decide
@ -1544,9 +1566,8 @@ public class HFileBlock implements Cacheable {
boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
HFileBlock blk = readBlockDataInternal(is, offset,
onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum, updateMetrics);
HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
if (blk == null) {
HFile.LOG.warn("HBase checksum verification failed for file " +
pathName + " at offset " +
@ -1573,7 +1594,7 @@ public class HFileBlock implements Cacheable {
is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
doVerificationThruHBaseChecksum = false;
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum, updateMetrics);
doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
if (blk != null) {
HFile.LOG.warn("HDFS checksum verification succeeded for file " +
pathName + " at offset " +
@ -1669,24 +1690,29 @@ public class HFileBlock implements Cacheable {
return nextBlockOnDiskSize;
}
private ByteBuff allocate(int size, boolean intoHeap) {
return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size);
}
/**
* Reads a version 2 block.
*
* @param offset the offset in the stream to read at.
* @param onDiskSizeWithHeaderL the on-disk size of the block, including
* the header and checksums if present or -1 if unknown (as a long). Can be -1
* if we are doing raw iteration of blocks as when loading up file metadata; i.e.
* the first read of a new file. Usually non-null gotten from the file index.
* @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
* checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw
* iteration of blocks as when loading up file metadata; i.e. the first read of a new
* file. Usually non-null gotten from the file index.
* @param pread whether to use a positional read
* @param verifyChecksum Whether to use HBase checksums.
* If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off
* reading same file if we hit a troublesome patch in an hfile.
* @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then
* use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome
* patch in an hfile.
* @param updateMetrics whether need to update the metrics.
* @param intoHeap allocate the ByteBuff of block from heap or off-heap.
* @return the HFileBlock or null if there is a HBase checksum mismatch
*/
@VisibleForTesting
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
throws IOException {
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
boolean intoHeap) throws IOException {
if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
@ -1728,7 +1754,7 @@ public class HFileBlock implements Cacheable {
// says where to start reading. If we have the header cached, then we don't need to read
// it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here.
ByteBuff onDiskBlock = allocator.allocate(onDiskSizeWithHeader + hdrSize);
ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
boolean initHFileBlockSuccess = false;
try {
if (headerBuf != null) {
@ -2072,7 +2098,7 @@ public class HFileBlock implements Cacheable {
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
}
public HFileBlock deepClone() {
public HFileBlock deepCloneOnHeap() {
return new HFileBlock(this, true);
}
}

View File

@ -272,8 +272,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
}
// TODO: Could we use block iterator in here? Would that get stuff into the cache?
HFileBlock prevBlock = null;
// Don't use BlockIterator here, because it's designed to read load-on-open section.
long onDiskSizeOfNextBlock = -1;
while (offset < end) {
if (Thread.interrupted()) {
break;
@ -282,16 +282,17 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1;
HFileBlock block = readBlock(offset, onDiskSize, /*cacheBlock=*/true,
/*pread=*/true, false, false, null, null);
// Need not update the current block. Ideally here the readBlock won't find the
// block in cache. We call this readBlock so that block data is read from FS and
// cached in BC. So there is no reference count increment that happens here.
// The return will ideally be a noop because the block is not of MemoryType SHARED.
returnBlock(block);
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
/* pread= */true, false, false, null, null);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
} finally {
// Ideally here the readBlock won't find the block in cache. We call this
// readBlock so that block data is read from FS and cached in BC. we must call
// returnBlock here to decrease the reference count of block.
returnBlock(block);
}
}
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
@ -1419,7 +1420,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache Miss, please load.
HFileBlock compressedBlock =
fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false);
fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false, true);
HFileBlock uncompressedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
if (compressedBlock != uncompressedBlock) {
compressedBlock.release();
@ -1434,6 +1435,24 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
}
/**
* If expected block is data block, we'll allocate the ByteBuff of block from
* {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} and it's usually an off-heap one,
* otherwise it will allocate from heap.
* @see org.apache.hadoop.hbase.io.hfile.HFileBlock.FSReader#readBlockData(long, long, boolean,
* boolean, boolean)
*/
private boolean shouldUseHeap(BlockType expectedBlockType) {
if (cacheConf.getBlockCache() == null) {
return false;
} else if (!cacheConf.isCombinedBlockCache()) {
// Block to cache in LruBlockCache must be an heap one. So just allocate block memory from
// heap for saving an extra off-heap to heap copying.
return true;
}
return expectedBlockType != null && !expectedBlockType.isData();
}
@Override
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock, boolean pread, final boolean isCompaction,
@ -1505,8 +1524,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
TraceUtil.addTimelineAnnotation("blockCacheMiss");
// Load block from filesystem.
HFileBlock hfileBlock =
fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction);
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType));
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();

View File

@ -354,6 +354,32 @@ public class LruBlockCache implements FirstLevelBlockCache {
}
}
/**
* The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
* access will be more faster then off-heap, the small index block or meta block cached in
* CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
* calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
* heap size will be messed up. Here we will clone the block into an heap block if it's an
* off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
* the block (HBASE-22127): <br>
* 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
* 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
* reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
* JVM, so need a retain here.
* @param buf the original block
* @return an block with an heap memory backend.
*/
private Cacheable asReferencedHeapBlock(Cacheable buf) {
if (buf instanceof HFileBlock) {
HFileBlock blk = ((HFileBlock) buf);
if (!blk.isOnHeap()) {
return blk.deepCloneOnHeap();
}
}
// The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
return buf.retain();
}
// BlockCache implementation
/**
@ -402,8 +428,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
}
return;
}
// The block will be referenced by the LRUBlockCache, so should increase the refCnt here.
buf.retain();
// Ensure that the block is an heap one.
buf = asReferencedHeapBlock(buf);
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb);
@ -503,7 +529,7 @@ public class LruBlockCache implements FirstLevelBlockCache {
if (caching) {
if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
Cacheable original = result;
result = ((HFileBlock) original).deepClone();
result = ((HFileBlock) original).deepCloneOnHeap();
// deepClone an new one, so need to put the original one back to free it.
victimHandler.returnBlock(cacheKey, original);
}

View File

@ -50,6 +50,8 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.HeapSize;
@ -557,23 +559,54 @@ public class BucketCache implements BlockCache, HeapSize {
return evictBlock(cacheKey, true);
}
private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) {
this.blockNumber.decrement();
this.heapSize.add(-1 * removedBlock.getData().heapSize());
// does not check for the ref count. Just tries to evict it if found in the
// bucket map
private boolean forceEvict(BlockCacheKey cacheKey) {
if (!cacheEnabled) {
return false;
}
return removedBlock;
boolean existed = removeFromRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
if (existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
return true;
} else {
return false;
}
}
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
try {
lock.writeLock().lock();
if (backingMap.remove(cacheKey, bucketEntry)) {
blockEvicted(cacheKey, bucketEntry, !existed);
} else {
return false;
}
} finally {
lock.writeLock().unlock();
}
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
return true;
}
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
return ramCache.remove(cacheKey, re -> {
if (re != null) {
this.blockNumber.decrement();
this.heapSize.add(-1 * re.getData().heapSize());
}
});
}
public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
if (!cacheEnabled) {
return false;
}
RAMQueueEntry removedBlock = checkRamCache(cacheKey);
boolean existed = removeFromRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
if (removedBlock != null) {
if (existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
return true;
} else {
@ -586,7 +619,7 @@ public class BucketCache implements BlockCache, HeapSize {
int refCount = bucketEntry.getRefCount();
if (refCount == 0) {
if (backingMap.remove(cacheKey, bucketEntry)) {
blockEvicted(cacheKey, bucketEntry, removedBlock == null);
blockEvicted(cacheKey, bucketEntry, !existed);
} else {
return false;
}
@ -1009,10 +1042,12 @@ public class BucketCache implements BlockCache, HeapSize {
putIntoBackingMap(key, bucketEntries[i]);
}
// Always remove from ramCache even if we failed adding it to the block cache above.
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) {
heapSize.add(-1 * entries.get(i).getData().heapSize());
} else if (bucketEntries[i] != null){
boolean existed = ramCache.remove(key, re -> {
if (re != null) {
heapSize.add(-1 * re.getData().heapSize());
}
});
if (!existed && bucketEntries[i] != null) {
// Block should have already been evicted. Remove it and free space.
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
try {
@ -1737,12 +1772,23 @@ public class BucketCache implements BlockCache, HeapSize {
return previous;
}
public RAMQueueEntry remove(BlockCacheKey key) {
public boolean remove(BlockCacheKey key) {
return remove(key, re->{});
}
/**
* Defined an {@link Consumer} here, because once the removed entry release its reference count,
* then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
* exception. the consumer will access entry to remove before release its reference count.
* Notice, don't change its reference count in the {@link Consumer}
*/
public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
RAMQueueEntry previous = delegate.remove(key);
action.accept(previous);
if (previous != null) {
previous.getData().release();
}
return previous;
return previous != null;
}
public boolean isEmpty() {

View File

@ -100,7 +100,8 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(b.isOnHeap());
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@ -146,7 +147,8 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(b.isOnHeap());
// verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly());
@ -215,7 +217,7 @@ public class TestChecksum {
.withHBaseCheckSum(true)
.build();
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
@ -236,19 +238,19 @@ public class TestChecksum {
// requests. Verify that this is correct.
for (int i = 0; i <
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
b = hbr.readBlockData(0, -1, pread, false);
b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
// The next read should have hbase checksum verification reanabled,
// we verify this by assertng that there was a hbase-checksum failure.
b = hbr.readBlockData(0, -1, pread, false);
b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
// Since the above encountered a checksum failure, we switch
// back to not checking hbase checksums.
b = hbr.readBlockData(0, -1, pread, false);
b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
is.close();
@ -260,7 +262,7 @@ public class TestChecksum {
assertEquals(false, newfs.useHBaseChecksum());
is = new FSDataInputStreamWrapper(newfs, path);
hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
b = hbr.readBlockData(0, -1, pread, false);
b = hbr.readBlockData(0, -1, pread, false, true);
is.close();
b.sanityCheck();
b = b.unpack(meta, hbr);
@ -343,7 +345,7 @@ public class TestChecksum {
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, nochecksum), totalSize,
hfs, path, meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
is.close();
b.sanityCheck();
@ -389,13 +391,13 @@ public class TestChecksum {
@Override
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
throws IOException {
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
boolean useHeap) throws IOException {
if (verifyChecksum) {
corruptDataStream = true;
}
HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
verifyChecksum, updateMetrics);
verifyChecksum, updateMetrics, useHeap);
corruptDataStream = false;
return b;
}

View File

@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -104,33 +108,129 @@ public class TestHFile {
fs = TEST_UTIL.getTestFileSystem();
}
private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount,
int minAllocSize) {
Configuration that = HBaseConfiguration.create(conf);
that.setInt(BUFFER_SIZE_KEY, bufSize);
that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
// All ByteBuffers will be allocated from the buffers.
that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize);
return ByteBuffAllocator.create(that, reservoirEnabled);
}
private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) {
// Fill the allocator with bufCount ByteBuffer
List<ByteBuff> buffs = new ArrayList<>();
for (int i = 0; i < bufCount; i++) {
buffs.add(alloc.allocateOneBuffer());
Assert.assertEquals(alloc.getQueueSize(), 0);
}
buffs.forEach(ByteBuff::release);
Assert.assertEquals(alloc.getQueueSize(), bufCount);
}
@Test
public void testReaderWithoutBlockCache() throws Exception {
int bufCount = 32;
Configuration that = HBaseConfiguration.create(conf);
that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
// AllByteBuffers will be allocated from the buffers.
that.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
ByteBuffAllocator alloc = ByteBuffAllocator.create(that, true);
List<ByteBuff> buffs = new ArrayList<>();
// Fill the allocator with bufCount ByteBuffer
for (int i = 0; i < bufCount; i++) {
buffs.add(alloc.allocateOneBuffer());
}
Assert.assertEquals(alloc.getQueueSize(), 0);
for (ByteBuff buf : buffs) {
buf.release();
}
Assert.assertEquals(alloc.getQueueSize(), bufCount);
ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0);
fillByteBuffAllocator(alloc, bufCount);
// start write to store file.
Path path = writeStoreFile();
try {
readStoreFile(path, that, alloc);
readStoreFile(path, conf, alloc);
} catch (Exception e) {
// fail test
assertTrue(false);
}
Assert.assertEquals(bufCount, alloc.getQueueSize());
alloc.clean();
}
/**
* Test case for HBASE-22127 in LruBlockCache.
*/
@Test
public void testReaderWithLRUBlockCache() throws Exception {
int bufCount = 1024, blockSize = 64 * 1024;
ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
fillByteBuffAllocator(alloc, bufCount);
Path storeFilePath = writeStoreFile();
// Open the file reader with LRUBlockCache
BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc);
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
offset += block.getOnDiskSizeWithHeader();
// Ensure the block is an heap one.
Cacheable cachedBlock = lru.getBlock(key, false, false, true);
Assert.assertNotNull(cachedBlock);
Assert.assertTrue(cachedBlock instanceof HFileBlock);
Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
// Should never allocate off-heap block from allocator because ensure that it's LRU.
Assert.assertEquals(bufCount, alloc.getQueueSize());
block.release(); // return back the ByteBuffer back to allocator.
}
reader.close();
Assert.assertEquals(bufCount, alloc.getQueueSize());
alloc.clean();
lru.shutdown();
}
private BlockCache initCombinedBlockCache() {
Configuration that = HBaseConfiguration.create(conf);
that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
BlockCache bc = BlockCacheFactory.createBlockCache(that);
Assert.assertNotNull(bc);
Assert.assertTrue(bc instanceof CombinedBlockCache);
return bc;
}
/**
* Test case for HBASE-22127 in CombinedBlockCache
*/
@Test
public void testReaderWithCombinedBlockCache() throws Exception {
int bufCount = 1024, blockSize = 64 * 1024;
ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
fillByteBuffAllocator(alloc, bufCount);
Path storeFilePath = writeStoreFile();
// Open the file reader with CombinedBlockCache
BlockCache combined = initCombinedBlockCache();
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
offset += block.getOnDiskSizeWithHeader();
// Read the cached block.
Cacheable cachedBlock = combined.getBlock(key, false, false, true);
try {
Assert.assertNotNull(cachedBlock);
Assert.assertTrue(cachedBlock instanceof HFileBlock);
HFileBlock hfb = (HFileBlock) cachedBlock;
// Data block will be cached in BucketCache, so it should be an off-heap block.
if (hfb.getBlockType().isData()) {
Assert.assertFalse(hfb.isOnHeap());
} else {
// Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
Assert.assertTrue(hfb.isOnHeap());
}
} finally {
combined.returnBlock(key, cachedBlock);
}
block.release(); // return back the ByteBuffer back to allocator.
}
reader.close();
combined.shutdown();
Assert.assertEquals(bufCount, alloc.getQueueSize());
alloc.clean();
}
private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc)

View File

@ -40,6 +40,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@ -68,6 +71,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@ -93,10 +97,12 @@ public class TestHFileBlock {
private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
// TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet.
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ };
private static final int NUM_TEST_BLOCKS = 1000;
private static final int NUM_READER_THREADS = 26;
private static final int MAX_BUFFER_COUNT = 2048;
// Used to generate KeyValues
private static int NUM_KEYVALUES = 50;
@ -108,14 +114,51 @@ public class TestHFileBlock {
private final boolean includesMemstoreTS;
private final boolean includesTag;
public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
private final boolean useHeapAllocator;
private final ByteBuffAllocator alloc;
public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) {
this.includesMemstoreTS = includesMemstoreTS;
this.includesTag = includesTag;
this.useHeapAllocator = useHeapAllocator;
this.alloc = useHeapAllocator ? ByteBuffAllocator.HEAP : createOffHeapAlloc();
assertAllocator();
}
@Parameters
public static Collection<Object[]> parameters() {
return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
List<Object[]> params = new ArrayList<>();
// Generate boolean triples from 000 to 111
for (int i = 0; i < (1 << 3); i++) {
Object[] flags = new Boolean[3];
for (int k = 0; k < 3; k++) {
flags[k] = (i & (1 << k)) != 0;
}
params.add(flags);
}
return params;
}
private ByteBuffAllocator createOffHeapAlloc() {
Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
// Fill the allocator
List<ByteBuff> bufs = new ArrayList<>();
for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
ByteBuff bb = alloc.allocateOneBuffer();
assertTrue(!bb.hasArray());
bufs.add(bb);
}
bufs.forEach(ByteBuff::release);
return alloc;
}
private void assertAllocator() {
if (!useHeapAllocator) {
assertEquals(MAX_BUFFER_COUNT, alloc.getQueueSize());
}
}
@Before
@ -123,6 +166,12 @@ public class TestHFileBlock {
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
}
@After
public void tearDown() throws IOException {
assertAllocator();
alloc.clean();
}
static void writeTestBlockContents(DataOutputStream dos) throws IOException {
// This compresses really well.
for (int i = 0; i < 1000; ++i)
@ -327,9 +376,8 @@ public class TestHFileBlock {
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(algo).build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, pread, false);
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
is.close();
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
@ -341,14 +389,14 @@ public class TestHFileBlock {
if (algo == GZ) {
is = fs.open(path);
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
b.totalChecksumBytes(), pread, false);
hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
b = hbr.readBlockData(0,
2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
b = hbr.readBlockData(0, wrongCompressedSize
+ HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
false, true);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
@ -356,8 +404,10 @@ public class TestHFileBlock {
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
}
assertTrue(b.release());
is.close();
}
assertTrue(expected.release());
}
}
}
@ -428,13 +478,13 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.build();
HFileBlock.FSReaderImpl hbr =
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemStoreTS(includesMemstoreTS);
HFileBlock blockFromHFile, blockUnpacked;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
blockFromHFile.sanityCheck();
pos += blockFromHFile.getOnDiskSizeWithHeader();
@ -487,6 +537,10 @@ public class TestHFileBlock {
blockUnpacked, deserialized.unpack(meta, hbr));
}
}
assertTrue(blockUnpacked.release());
if (blockFromHFile != blockUnpacked) {
blockFromHFile.release();
}
}
is.close();
}
@ -557,7 +611,7 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.withCompression(algo).build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
long curOffset = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
if (!pread) {
@ -569,7 +623,7 @@ public class TestHFileBlock {
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@ -583,7 +637,8 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
HFileBlock b2 =
hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@ -599,6 +654,7 @@ public class TestHFileBlock {
assertEquals(b.getOnDiskDataSizeWithHeader(),
b2.getOnDiskDataSizeWithHeader());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
assertTrue(b2.release());
curOffset += b.getOnDiskSizeWithHeader();
@ -606,14 +662,14 @@ public class TestHFileBlock {
// NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
// verifies that the unpacked value read back off disk matches the unpacked value
// generated before writing to disk.
b = b.unpack(meta, hbr);
HFileBlock newBlock = b.unpack(meta, hbr);
// b's buffer has header + data + checksum while
// expectedContents have header + data only
ByteBuff bufRead = b.getBufferReadOnly();
ByteBuff bufRead = newBlock.getBufferReadOnly();
ByteBuffer bufExpected = expectedContents.get(i);
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
bufRead.arrayOffset(),
bufRead.limit() - b.totalChecksumBytes(),
bufRead.limit() - newBlock.totalChecksumBytes(),
bufExpected.array(), bufExpected.arrayOffset(),
bufExpected.limit()) == 0;
String wrongBytesMsg = "";
@ -642,9 +698,12 @@ public class TestHFileBlock {
}
}
assertTrue(wrongBytesMsg, bytesAreCorrect);
assertTrue(newBlock.release());
if (newBlock != b) {
assertTrue(b.release());
}
}
}
assertEquals(curOffset, fs.getFileStatus(path).getLen());
is.close();
}
@ -687,29 +746,37 @@ public class TestHFileBlock {
boolean pread = true;
boolean withOnDiskSize = rand.nextBoolean();
long expectedSize =
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize
: offsets.get(blockId + 1)) - offset;
HFileBlock b;
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset;
HFileBlock b = null;
try {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
if (useHeapAllocator) {
assertTrue(b.isOnHeap());
} else {
assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
}
assertEquals(types.get(blockId), b.getBlockType());
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
assertEquals(offset, b.getOffset());
} catch (IOException ex) {
LOG.error("Error in client " + clientId + " trying to read block at "
+ offset + ", pread=" + pread + ", withOnDiskSize=" +
withOnDiskSize, ex);
LOG.error("Error in client " + clientId + " trying to read block at " + offset
+ ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize,
ex);
return false;
} finally {
if (b != null) {
b.release();
}
}
++numBlocksRead;
if (pread) {
++numPositionalRead;
}
assertEquals(types.get(blockId), b.getBlockType());
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
assertEquals(offset, b.getOffset());
++numBlocksRead;
if (pread)
++numPositionalRead;
if (withOnDiskSize)
if (withOnDiskSize) {
++numWithOnDiskSize;
}
}
LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
" blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
@ -717,7 +784,6 @@ public class TestHFileBlock {
return true;
}
}
@Test
@ -742,7 +808,7 @@ public class TestHFileBlock {
.withCompression(compressAlgo)
.build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(is, fileSize, meta, ByteBuffAllocator.HEAP);
new HFileBlock.FSReaderImpl(is, fileSize, meta, alloc);
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
@ -761,7 +827,6 @@ public class TestHFileBlock {
+ ")");
}
}
is.close();
}
}
@ -874,9 +939,9 @@ public class TestHFileBlock {
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
ByteBuffer buff1 = ByteBuffer.allocate(length);
ByteBuffer buff2 = ByteBuffer.allocate(length);
blockWithNextBlockMetadata.serialize(buff1, true);

View File

@ -192,7 +192,7 @@ public class TestHFileBlockIndex {
}
missCount += 1;
prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false);
prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false, true);
prevOffset = offset;
prevOnDiskSize = onDiskSize;
prevPread = pread;

View File

@ -109,7 +109,7 @@ public class TestHFileEncryption {
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
throws IOException {
HFileBlock b = hbr.readBlockData(pos, -1, false, false);
HFileBlock b = hbr.readBlockData(pos, -1, false, false, true);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertFalse(b.isUnpacked());

View File

@ -224,8 +224,8 @@ public class TestHFileWriterV3 {
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(context, blockReader);
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
.unpack(context, blockReader);
assertEquals(BlockType.DATA, block.getBlockType());
ByteBuff buf = block.getBufferWithoutHeader();
int keyLen = -1;
@ -285,8 +285,8 @@ public class TestHFileWriterV3 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
.unpack(context, blockReader);
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
.unpack(context, blockReader);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();
ByteBuff buf = block.getBufferWithoutHeader();

View File

@ -160,7 +160,7 @@ public class TestLazyDataBlockDecompression {
CacheConfig cc = new CacheConfig(lazyCompressDisabled,
new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled));
assertFalse(cc.shouldCacheDataCompressed());
assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
assertFalse(cc.isCombinedBlockCache());
LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
LOG.info("disabledBlockCache=" + disabledBlockCache);
assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());

View File

@ -59,10 +59,10 @@ public class TestBucketWriterThread {
private static class MockBucketCache extends BucketCache {
public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
throws FileNotFoundException, IOException {
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
}
@Override

View File

@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -192,9 +191,7 @@ public abstract class AbstractTestDLS {
Path rootdir = FSUtils.getRootDir(conf);
int numRegions = 50;
try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
Table t = installTable(zkw, numRegions)) {
TableName table = t.getName();
try (Table t = installTable(numRegions)) {
List<RegionInfo> regions = null;
HRegionServer hrs = null;
for (int i = 0; i < NUM_RS; i++) {
@ -224,7 +221,6 @@ public abstract class AbstractTestDLS {
int count = 0;
for (RegionInfo hri : regions) {
Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitUtil
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
@ -266,8 +262,7 @@ public abstract class AbstractTestDLS {
// they will consume recovered.edits
master.balanceSwitch(false);
try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
Table ht = installTable(zkw, numRegionsToCreate)) {
try (Table ht = installTable(numRegionsToCreate)) {
HRegionServer hrs = findRSToKill(false);
List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
makeWAL(hrs, regions, numLogLines, 100);
@ -329,8 +324,7 @@ public abstract class AbstractTestDLS {
final Path logDir = new Path(rootdir,
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
Table t = installTable(zkw, 40)) {
try (Table t = installTable(40)) {
makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
new Thread() {
@ -380,8 +374,7 @@ public abstract class AbstractTestDLS {
startCluster(NUM_RS); // NUM_RS=6.
try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null);
Table table = installTable(zkw, numRegionsToCreate)) {
try (Table table = installTable(numRegionsToCreate)) {
populateDataInTable(numRowsPerRegion);
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
@ -482,11 +475,11 @@ public abstract class AbstractTestDLS {
}
}
private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
return installTable(zkw, nrs, 0);
private Table installTable(int nrs) throws Exception {
return installTable(nrs, 0);
}
private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception {
private Table installTable(int nrs, int existingRegions) throws Exception {
// Create a table with regions
byte[] family = Bytes.toBytes("family");
LOG.info("Creating table with " + nrs + " regions");
@ -497,14 +490,14 @@ public abstract class AbstractTestDLS {
}
assertEquals(nrs, numRegions);
LOG.info("Waiting for no more RIT\n");
blockUntilNoRIT(zkw, master);
blockUntilNoRIT();
// disable-enable cycle to get rid of table's dead regions left behind
// by createMultiRegions
assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
LOG.debug("Disabling table\n");
TEST_UTIL.getAdmin().disableTable(tableName);
LOG.debug("Waiting for no more RIT\n");
blockUntilNoRIT(zkw, master);
blockUntilNoRIT();
NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
LOG.debug("Verifying only catalog region is assigned\n");
if (regions.size() != 1) {
@ -515,7 +508,7 @@ public abstract class AbstractTestDLS {
LOG.debug("Enabling table\n");
TEST_UTIL.getAdmin().enableTable(tableName);
LOG.debug("Waiting for no more RIT\n");
blockUntilNoRIT(zkw, master);
blockUntilNoRIT();
LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
assertEquals(numRegions + 1 + existingRegions, regions.size());
@ -651,7 +644,7 @@ public abstract class AbstractTestDLS {
return count;
}
private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception {
private void blockUntilNoRIT() throws Exception {
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
}