HBASE-22491 Separate the heap HFileBlock and offheap HFileBlock because the heap block won't need refCnt and save into prevBlocks list before shipping (#268)
This commit is contained in:
parent
5ff317fe7b
commit
3199175932
|
@ -297,6 +297,12 @@ public class ByteBuffAllocator {
|
|||
}
|
||||
}
|
||||
}
|
||||
this.usedBufCount.set(0);
|
||||
this.maxPoolSizeInfoLevelLogged = false;
|
||||
this.poolAllocationBytes.reset();
|
||||
this.heapAllocationBytes.reset();
|
||||
this.lastPoolAllocationBytes = 0;
|
||||
this.lastHeapAllocationBytes = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
|
||||
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
|
||||
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
|
||||
* would must be an off-heap block.
|
||||
* <p>
|
||||
* The exclusive memory HFileBlock will do nothing when calling retain or release methods, because
|
||||
* its memory will be garbage collected by JVM, even if its reference count decrease to zero, we can
|
||||
* do nothing for the de-allocating.
|
||||
* <p>
|
||||
* @see org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ExclusiveMemHFileBlock extends HFileBlock {
|
||||
|
||||
ExclusiveMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
|
||||
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
|
||||
HFileContext fileContext) {
|
||||
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
|
||||
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
|
||||
ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int refCnt() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExclusiveMemHFileBlock retain() {
|
||||
// do nothing
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() {
|
||||
// do nothing
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSharedMem() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -285,7 +285,7 @@ public class HFileBlock implements Cacheable {
|
|||
boolean usesChecksum = buf.get() == (byte) 1;
|
||||
long offset = buf.getLong();
|
||||
int nextBlockOnDiskSize = buf.getInt();
|
||||
return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
|
||||
return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -300,28 +300,6 @@ public class HFileBlock implements Cacheable {
|
|||
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
|
||||
*/
|
||||
private HFileBlock(HFileBlock that) {
|
||||
this(that, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
|
||||
* param.
|
||||
*/
|
||||
private HFileBlock(HFileBlock that, boolean bufCopy) {
|
||||
init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
|
||||
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
|
||||
that.fileContext, that.allocator);
|
||||
if (bufCopy) {
|
||||
this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
|
||||
} else {
|
||||
this.buf = that.buf.duplicate();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link HFile} block from the given fields. This constructor
|
||||
* is used only while writing blocks and caching,
|
||||
|
@ -337,7 +315,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
|
||||
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
|
||||
* @param prevBlockOffset see {@link #prevBlockOffset}
|
||||
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
|
||||
* @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
|
||||
* @param fillHeader when true, write the first 4 header fields into passed buffer.
|
||||
* @param offset the file offset the block was read from
|
||||
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
|
||||
|
@ -345,12 +323,19 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
|
||||
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
|
||||
HFileContext fileContext, ByteBuffAllocator allocator) {
|
||||
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
|
||||
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
|
||||
this.buf = new SingleByteBuff(b);
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
|
||||
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) {
|
||||
this.blockType = blockType;
|
||||
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
|
||||
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
|
||||
this.prevBlockOffset = prevBlockOffset;
|
||||
this.offset = offset;
|
||||
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
|
||||
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
|
||||
this.fileContext = fileContext;
|
||||
this.allocator = allocator;
|
||||
this.buf = buf;
|
||||
if (fillHeader) {
|
||||
overwriteHeader();
|
||||
}
|
||||
|
@ -364,7 +349,7 @@ public class HFileBlock implements Cacheable {
|
|||
* to that point.
|
||||
* @param buf Has header, content, and trailing checksums if present.
|
||||
*/
|
||||
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
|
||||
static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
|
||||
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
|
||||
throws IOException {
|
||||
buf.rewind();
|
||||
|
@ -375,15 +360,15 @@ public class HFileBlock implements Cacheable {
|
|||
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
|
||||
// This constructor is called when we deserialize a block from cache and when we read a block in
|
||||
// from the fs. fileCache is null when deserialized from cache so need to make up one.
|
||||
HFileContextBuilder fileContextBuilder = fileContext != null?
|
||||
new HFileContextBuilder(fileContext): new HFileContextBuilder();
|
||||
HFileContextBuilder fileContextBuilder =
|
||||
fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
|
||||
fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
|
||||
int onDiskDataSizeWithHeader;
|
||||
if (usesHBaseChecksum) {
|
||||
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
|
||||
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
|
||||
onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
|
||||
// Use the checksum type and bytes per checksum from header, not from filecontext.
|
||||
// Use the checksum type and bytes per checksum from header, not from fileContext.
|
||||
fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
|
||||
fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
|
||||
} else {
|
||||
|
@ -394,29 +379,19 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
fileContext = fileContextBuilder.build();
|
||||
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
|
||||
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
|
||||
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
|
||||
this.offset = offset;
|
||||
this.buf = buf;
|
||||
this.buf.rewind();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called from constructors.
|
||||
*/
|
||||
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
|
||||
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) {
|
||||
this.blockType = blockType;
|
||||
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
|
||||
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
|
||||
this.prevBlockOffset = prevBlockOffset;
|
||||
this.offset = offset;
|
||||
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
|
||||
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
|
||||
this.fileContext = fileContext;
|
||||
this.allocator = allocator;
|
||||
return new HFileBlockBuilder()
|
||||
.withBlockType(blockType)
|
||||
.withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
|
||||
.withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
|
||||
.withPrevBlockOffset(prevBlockOffset)
|
||||
.withOffset(offset)
|
||||
.withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
|
||||
.withNextBlockOnDiskSize(nextBlockOnDiskSize)
|
||||
.withHFileContext(fileContext)
|
||||
.withByteBuffAllocator(allocator)
|
||||
.withByteBuff(buf.rewind())
|
||||
.withShared(!buf.hasArray())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -640,7 +615,7 @@ public class HFileBlock implements Cacheable {
|
|||
.append("(").append(onDiskSizeWithoutHeader)
|
||||
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
|
||||
}
|
||||
String dataBegin = null;
|
||||
String dataBegin;
|
||||
if (buf.hasArray()) {
|
||||
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
|
||||
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
|
||||
|
@ -674,7 +649,7 @@ public class HFileBlock implements Cacheable {
|
|||
return this;
|
||||
}
|
||||
|
||||
HFileBlock unpacked = new HFileBlock(this);
|
||||
HFileBlock unpacked = shallowClone(this);
|
||||
unpacked.allocateBuffer(); // allocates space for the decompressed block
|
||||
boolean succ = false;
|
||||
try {
|
||||
|
@ -762,10 +737,16 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
|
||||
* Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
|
||||
* by default.
|
||||
*/
|
||||
boolean isOnHeap() {
|
||||
return buf.hasArray();
|
||||
public boolean isSharedMem() {
|
||||
if (this instanceof SharedMemHFileBlock) {
|
||||
return true;
|
||||
} else if (this instanceof ExclusiveMemHFileBlock) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1040,8 +1021,7 @@ public class HFileBlock implements Cacheable {
|
|||
+ offset);
|
||||
}
|
||||
startOffset = offset;
|
||||
|
||||
finishBlockAndWriteHeaderAndData((DataOutputStream) out);
|
||||
finishBlockAndWriteHeaderAndData(out);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1252,13 +1232,27 @@ public class HFileBlock implements Cacheable {
|
|||
.withIncludesMvcc(fileContext.isIncludesMvcc())
|
||||
.withIncludesTags(fileContext.isIncludesTags())
|
||||
.build();
|
||||
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
|
||||
getUncompressedSizeWithoutHeader(), prevOffset,
|
||||
cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
|
||||
: cloneUncompressedBufferWithHeader(),
|
||||
FILL_HEADER, startOffset, UNSET,
|
||||
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
|
||||
cacheConf.getByteBuffAllocator());
|
||||
// Build the HFileBlock.
|
||||
HFileBlockBuilder builder = new HFileBlockBuilder();
|
||||
ByteBuffer buffer;
|
||||
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
|
||||
buffer = cloneOnDiskBufferWithHeader();
|
||||
} else {
|
||||
buffer = cloneUncompressedBufferWithHeader();
|
||||
}
|
||||
return builder.withBlockType(blockType)
|
||||
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
|
||||
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
|
||||
.withPrevBlockOffset(prevOffset)
|
||||
.withByteBuff(ByteBuff.wrap(buffer))
|
||||
.withFillHeader(FILL_HEADER)
|
||||
.withOffset(startOffset)
|
||||
.withNextBlockOnDiskSize(UNSET)
|
||||
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
|
||||
.withHFileContext(newContext)
|
||||
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
|
||||
.withShared(!buffer.hasArray())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1782,8 +1776,8 @@ public class HFileBlock implements Cacheable {
|
|||
// The onDiskBlock will become the headerAndDataBuffer for this block.
|
||||
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
|
||||
// contains the header of next block, so no need to set next block's header in it.
|
||||
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
|
||||
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
|
||||
HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
|
||||
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
|
||||
// Run check on uncompressed sizings.
|
||||
if (!fileContext.isCompressedOrEncrypted()) {
|
||||
hFileBlock.sanityCheckUncompressed();
|
||||
|
@ -1948,7 +1942,7 @@ public class HFileBlock implements Cacheable {
|
|||
if (comparison == null) {
|
||||
return false;
|
||||
}
|
||||
if (comparison.getClass() != this.getClass()) {
|
||||
if (!(comparison instanceof HFileBlock)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2085,7 +2079,27 @@ public class HFileBlock implements Cacheable {
|
|||
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
|
||||
}
|
||||
|
||||
public HFileBlock deepCloneOnHeap() {
|
||||
return new HFileBlock(this, true);
|
||||
private static HFileBlockBuilder createBuilder(HFileBlock blk){
|
||||
return new HFileBlockBuilder()
|
||||
.withBlockType(blk.blockType)
|
||||
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
|
||||
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
|
||||
.withPrevBlockOffset(blk.prevBlockOffset)
|
||||
.withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
|
||||
.withOffset(blk.offset)
|
||||
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
|
||||
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
|
||||
.withHFileContext(blk.fileContext)
|
||||
.withByteBuffAllocator(blk.allocator)
|
||||
.withShared(blk.isSharedMem());
|
||||
}
|
||||
|
||||
static HFileBlock shallowClone(HFileBlock blk) {
|
||||
return createBuilder(blk).build();
|
||||
}
|
||||
|
||||
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
|
||||
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
|
||||
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static javax.swing.Spring.UNSET;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HFileBlockBuilder {
|
||||
|
||||
private BlockType blockType;
|
||||
private int onDiskSizeWithoutHeader;
|
||||
private int onDiskDataSizeWithHeader;
|
||||
private int uncompressedSizeWithoutHeader;
|
||||
private long prevBlockOffset;
|
||||
private ByteBuff buf;
|
||||
private boolean fillHeader = false;
|
||||
private long offset = UNSET;
|
||||
private int nextBlockOnDiskSize = UNSET;
|
||||
private HFileContext fileContext;
|
||||
private ByteBuffAllocator allocator = HEAP;
|
||||
private boolean isShared;
|
||||
|
||||
public HFileBlockBuilder withBlockType(BlockType blockType) {
|
||||
this.blockType = blockType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withOnDiskSizeWithoutHeader(int onDiskSizeWithoutHeader) {
|
||||
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withOnDiskDataSizeWithHeader(int onDiskDataSizeWithHeader) {
|
||||
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withUncompressedSizeWithoutHeader(int uncompressedSizeWithoutHeader) {
|
||||
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withPrevBlockOffset(long prevBlockOffset) {
|
||||
this.prevBlockOffset = prevBlockOffset;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withByteBuff(ByteBuff buf) {
|
||||
this.buf = buf;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withFillHeader(boolean fillHeader) {
|
||||
this.fillHeader = fillHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withOffset(long offset) {
|
||||
this.offset = offset;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withNextBlockOnDiskSize(int nextBlockOnDiskSize) {
|
||||
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withHFileContext(HFileContext fileContext) {
|
||||
this.fileContext = fileContext;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withByteBuffAllocator(ByteBuffAllocator allocator) {
|
||||
this.allocator = allocator;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlockBuilder withShared(boolean isShared) {
|
||||
this.isShared = isShared;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HFileBlock build() {
|
||||
if (isShared) {
|
||||
return new SharedMemHFileBlock(blockType, onDiskSizeWithoutHeader,
|
||||
uncompressedSizeWithoutHeader, prevBlockOffset, buf, fillHeader, offset,
|
||||
nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, allocator);
|
||||
} else {
|
||||
return new ExclusiveMemHFileBlock(blockType, onDiskSizeWithoutHeader,
|
||||
uncompressedSizeWithoutHeader, prevBlockOffset, buf, fillHeader, offset,
|
||||
nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -523,15 +523,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) {
|
||||
return;
|
||||
}
|
||||
if (this.curBlock != null) {
|
||||
if (this.curBlock != null && this.curBlock.isSharedMem()) {
|
||||
prevBlocks.add(this.curBlock);
|
||||
}
|
||||
this.curBlock = block;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
// We don't have to keep ref to EXCLUSIVE type of block
|
||||
if (this.curBlock != null) {
|
||||
// We don't have to keep ref to heap block
|
||||
if (this.curBlock != null && this.curBlock.isSharedMem()) {
|
||||
this.prevBlocks.add(this.curBlock);
|
||||
}
|
||||
this.curBlock = null;
|
||||
|
|
|
@ -372,8 +372,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
|
|||
private Cacheable asReferencedHeapBlock(Cacheable buf) {
|
||||
if (buf instanceof HFileBlock) {
|
||||
HFileBlock blk = ((HFileBlock) buf);
|
||||
if (!blk.isOnHeap()) {
|
||||
return blk.deepCloneOnHeap();
|
||||
if (blk.isSharedMem()) {
|
||||
return HFileBlock.deepCloneOnHeap(blk);
|
||||
}
|
||||
}
|
||||
// The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
|
||||
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
|
||||
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
|
||||
* would must be an off-heap block.
|
||||
* @see org.apache.hadoop.hbase.io.hfile.ExclusiveMemHFileBlock
|
||||
**/
|
||||
@InterfaceAudience.Private
|
||||
public class SharedMemHFileBlock extends HFileBlock {
|
||||
|
||||
SharedMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
|
||||
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
|
||||
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
|
||||
ByteBuffAllocator alloc) {
|
||||
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
|
||||
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, alloc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSharedMem() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -171,8 +171,8 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
|
|||
if (victimCache != null) {
|
||||
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
if ((value != null) && caching) {
|
||||
if ((value instanceof HFileBlock) && !((HFileBlock) value).isOnHeap()) {
|
||||
value = ((HFileBlock) value).deepCloneOnHeap();
|
||||
if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
|
||||
value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
|
||||
}
|
||||
cacheBlock(cacheKey, value);
|
||||
}
|
||||
|
|
|
@ -43,10 +43,12 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.ExclusiveMemHFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock;
|
||||
import org.apache.hadoop.hbase.regionserver.CSLMImmutableSegment;
|
||||
import org.apache.hadoop.hbase.regionserver.CellArrayImmutableSegment;
|
||||
import org.apache.hadoop.hbase.regionserver.CellArrayMap;
|
||||
|
@ -529,6 +531,14 @@ public class TestHeapSize {
|
|||
actual = HFileBlock.FIXED_OVERHEAD;
|
||||
expected = ClassSize.estimateBase(HFileBlock.class, false);
|
||||
assertEquals(expected, actual);
|
||||
|
||||
actual = ExclusiveMemHFileBlock.FIXED_OVERHEAD;
|
||||
expected = ClassSize.estimateBase(ExclusiveMemHFileBlock.class, false);
|
||||
assertEquals(expected, actual);
|
||||
|
||||
actual = SharedMemHFileBlock.FIXED_OVERHEAD;
|
||||
expected = ClassSize.estimateBase(SharedMemHFileBlock.class, false);
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -148,7 +148,6 @@ public class CacheTestUtils {
|
|||
if (buf != null) {
|
||||
assertEquals(block.block, buf);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Re-add some duplicate blocks. Hope nothing breaks.
|
||||
|
@ -307,10 +306,11 @@ public class CacheTestUtils {
|
|||
.withBytesPerCheckSum(0)
|
||||
.withChecksumType(ChecksumType.NULL)
|
||||
.build();
|
||||
HFileBlock generated = new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader,
|
||||
uncompressedSizeWithoutHeader, prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
|
||||
blockSize, onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
|
||||
ByteBuffAllocator.HEAP);
|
||||
HFileBlock generated =
|
||||
new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
|
||||
prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
|
||||
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
|
||||
ByteBuffAllocator.HEAP);
|
||||
|
||||
String strKey;
|
||||
/* No conflicting keys */
|
||||
|
|
|
@ -101,7 +101,7 @@ public class TestChecksum {
|
|||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(b.isOnHeap());
|
||||
assertTrue(!b.isSharedMem());
|
||||
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ public class TestChecksum {
|
|||
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
|
||||
meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
|
||||
assertTrue(b.isOnHeap());
|
||||
assertTrue(!b.isSharedMem());
|
||||
|
||||
// verify SingleByteBuff checksum.
|
||||
verifySBBCheckSum(b.getBufferReadOnly());
|
||||
|
|
|
@ -169,7 +169,7 @@ public class TestHFile {
|
|||
Cacheable cachedBlock = lru.getBlock(key, false, false, true);
|
||||
Assert.assertNotNull(cachedBlock);
|
||||
Assert.assertTrue(cachedBlock instanceof HFileBlock);
|
||||
Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
|
||||
Assert.assertFalse(((HFileBlock) cachedBlock).isSharedMem());
|
||||
// Should never allocate off-heap block from allocator because ensure that it's LRU.
|
||||
Assert.assertEquals(bufCount, alloc.getFreeBufferCount());
|
||||
block.release(); // return back the ByteBuffer back to allocator.
|
||||
|
@ -217,10 +217,10 @@ public class TestHFile {
|
|||
HFileBlock hfb = (HFileBlock) cachedBlock;
|
||||
// Data block will be cached in BucketCache, so it should be an off-heap block.
|
||||
if (hfb.getBlockType().isData()) {
|
||||
Assert.assertFalse(hfb.isOnHeap());
|
||||
Assert.assertTrue(hfb.isSharedMem());
|
||||
} else {
|
||||
// Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
|
||||
Assert.assertTrue(hfb.isOnHeap());
|
||||
Assert.assertFalse(hfb.isSharedMem());
|
||||
}
|
||||
} finally {
|
||||
cachedBlock.release();
|
||||
|
|
|
@ -340,6 +340,14 @@ public class TestHFileBlock {
|
|||
testReaderV2Internals();
|
||||
}
|
||||
|
||||
private void assertRelease(HFileBlock blk) {
|
||||
if (blk instanceof ExclusiveMemHFileBlock) {
|
||||
assertFalse(blk.release());
|
||||
} else {
|
||||
assertTrue(blk.release());
|
||||
}
|
||||
}
|
||||
|
||||
protected void testReaderV2Internals() throws IOException {
|
||||
if(includesTag) {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
|
@ -403,10 +411,10 @@ public class TestHFileBlock {
|
|||
+ "'.\nMessage is expected to start with: '" + expectedPrefix
|
||||
+ "'", ex.getMessage().startsWith(expectedPrefix));
|
||||
}
|
||||
assertTrue(b.release());
|
||||
assertRelease(b);
|
||||
is.close();
|
||||
}
|
||||
assertTrue(expected.release());
|
||||
assertRelease(expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -534,7 +542,7 @@ public class TestHFileBlock {
|
|||
deserialized.unpack(meta, hbr));
|
||||
}
|
||||
}
|
||||
assertTrue(blockUnpacked.release());
|
||||
assertRelease(blockUnpacked);
|
||||
if (blockFromHFile != blockUnpacked) {
|
||||
blockFromHFile.release();
|
||||
}
|
||||
|
@ -651,7 +659,7 @@ public class TestHFileBlock {
|
|||
assertEquals(b.getOnDiskDataSizeWithHeader(),
|
||||
b2.getOnDiskDataSizeWithHeader());
|
||||
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
|
||||
assertTrue(b2.release());
|
||||
assertRelease(b2);
|
||||
|
||||
curOffset += b.getOnDiskSizeWithHeader();
|
||||
|
||||
|
@ -694,12 +702,12 @@ public class TestHFileBlock {
|
|||
}
|
||||
}
|
||||
assertTrue(wrongBytesMsg, bytesAreCorrect);
|
||||
assertTrue(newBlock.release());
|
||||
assertRelease(newBlock);
|
||||
if (newBlock != b) {
|
||||
assertTrue(b.release());
|
||||
assertRelease(b);
|
||||
}
|
||||
} else {
|
||||
assertTrue(b.release());
|
||||
assertRelease(b);
|
||||
}
|
||||
}
|
||||
assertEquals(curOffset, fs.getFileStatus(path).getLen());
|
||||
|
@ -750,9 +758,9 @@ public class TestHFileBlock {
|
|||
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
|
||||
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
|
||||
if (useHeapAllocator) {
|
||||
assertTrue(b.isOnHeap());
|
||||
assertTrue(!b.isSharedMem());
|
||||
} else {
|
||||
assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
|
||||
assertTrue(!b.getBlockType().isData() || b.isSharedMem());
|
||||
}
|
||||
assertEquals(types.get(blockId), b.getBlockType());
|
||||
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
|
||||
|
@ -913,14 +921,13 @@ public class TestHFileBlock {
|
|||
.withCompression(Algorithm.NONE)
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
|
||||
.withChecksumType(ChecksumType.NULL).build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
-1, 0, -1, meta, HEAP);
|
||||
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
|
||||
new MultiByteBuff(buf).getClass(), true)
|
||||
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
|
||||
long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
|
||||
long hfileBlockExpectedSize =
|
||||
ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP);
|
||||
long byteBufferExpectedSize =
|
||||
ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true)
|
||||
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
|
||||
long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
|
||||
long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
|
||||
long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
|
||||
assertEquals("Block data size: " + size + ", byte buffer expected " +
|
||||
"size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
|
||||
|
@ -936,10 +943,10 @@ public class TestHFileBlock {
|
|||
byte[] byteArr = new byte[length];
|
||||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
|
||||
ByteBuffer buff1 = ByteBuffer.allocate(length);
|
||||
ByteBuffer buff2 = ByteBuffer.allocate(length);
|
||||
blockWithNextBlockMetadata.serialize(buff1, true);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
|||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
|
@ -132,8 +133,8 @@ public class TestHFileDataBlockEncoder {
|
|||
.withBlockSize(0)
|
||||
.withChecksumType(ChecksumType.NULL)
|
||||
.build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, 0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
|
||||
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
|
||||
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
|
||||
}
|
||||
|
@ -198,9 +199,8 @@ public class TestHFileDataBlockEncoder {
|
|||
.withBlockSize(0)
|
||||
.withChecksumType(ChecksumType.NULL)
|
||||
.build();
|
||||
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, 0,
|
||||
0, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, 0, 0, -1, meta, ByteBuffAllocator.HEAP);
|
||||
return b;
|
||||
}
|
||||
|
||||
|
@ -221,9 +221,9 @@ public class TestHFileDataBlockEncoder {
|
|||
blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA);
|
||||
byte[] encodedBytes = baos.toByteArray();
|
||||
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
|
||||
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
|
||||
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
|
||||
block.getHFileContext(), ByteBuffAllocator.HEAP);
|
||||
return new HFileBlock(context.getBlockType(), size, size, -1,
|
||||
ByteBuff.wrap(ByteBuffer.wrap(encodedBytes)), HFileBlock.FILL_HEADER, 0,
|
||||
block.getOnDiskDataSizeWithHeader(), -1, block.getHFileContext(), ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)
|
||||
|
|
|
@ -18,6 +18,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -34,17 +39,24 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -55,30 +67,74 @@ public class TestHFileScannerImplReferenceCount {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
|
||||
|
||||
@Rule
|
||||
public TestName CASE = new TestName();
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] FAMILY = Bytes.toBytes("f");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("q");
|
||||
private static final byte[] SUFFIX = randLongBytes();
|
||||
private static final int CELL_COUNT = 1000;
|
||||
|
||||
private static byte[] randLongBytes() {
|
||||
Random rand = new Random();
|
||||
byte[] keys = new byte[300];
|
||||
byte[] keys = new byte[30];
|
||||
rand.nextBytes(keys);
|
||||
return keys;
|
||||
}
|
||||
|
||||
// It's a deep copy of configuration of UTIL, DON'T use shallow copy.
|
||||
private Configuration conf;
|
||||
private Path workDir;
|
||||
private FileSystem fs;
|
||||
private Path hfilePath;
|
||||
private Cell firstCell = null;
|
||||
private Cell secondCell = null;
|
||||
private ByteBuffAllocator allocator;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
public static void setUpBeforeClass() {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
// Set the max chunk size and min entries key to be very small for index block, so that we can
|
||||
// create an index block tree with level >= 2.
|
||||
conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
|
||||
conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
|
||||
// Create a bucket cache with 32MB.
|
||||
conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||
conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
|
||||
conf.setInt(BUFFER_SIZE_KEY, 1024);
|
||||
conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
|
||||
// All allocated ByteBuff are pooled ByteBuff.
|
||||
conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
this.firstCell = null;
|
||||
this.secondCell = null;
|
||||
this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
|
||||
this.conf = new Configuration(UTIL.getConfiguration());
|
||||
String caseName = CASE.getMethodName();
|
||||
this.workDir = UTIL.getDataTestDir(caseName);
|
||||
this.fs = this.workDir.getFileSystem(conf);
|
||||
this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
|
||||
LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
this.allocator.clean();
|
||||
this.fs.delete(this.workDir, true);
|
||||
}
|
||||
|
||||
private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
|
||||
Assert.assertTrue(cache instanceof CombinedBlockCache);
|
||||
BlockCache[] blockCaches = cache.getBlockCaches();
|
||||
Assert.assertEquals(blockCaches.length, 2);
|
||||
Assert.assertTrue(blockCaches[1] instanceof BucketCache);
|
||||
TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
|
||||
}
|
||||
|
||||
private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
|
||||
|
@ -107,176 +163,192 @@ public class TestHFileScannerImplReferenceCount {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A careful UT for validating the reference count mechanism, if want to change this UT please
|
||||
* read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
|
||||
*/
|
||||
private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
Path dir = UTIL.getDataTestDir("testReleasingBlock");
|
||||
FileSystem fs = dir.getFileSystem(conf);
|
||||
try {
|
||||
String hfileName = "testReleaseBlock_hfile_0_" + System.currentTimeMillis();
|
||||
Path hfilePath = new Path(dir, hfileName);
|
||||
int cellCount = 1000;
|
||||
LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
|
||||
writeHFile(conf, fs, hfilePath, compression, encoding, cellCount);
|
||||
writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
|
||||
HFileBlock curBlock, prevBlock;
|
||||
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
|
||||
Assert.assertNotNull(defaultBC);
|
||||
Assert.assertTrue(cacheConfig.isCombinedBlockCache());
|
||||
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
|
||||
Assert.assertTrue(reader instanceof HFileReaderImpl);
|
||||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
|
||||
Assert.assertNotNull(defaultBC);
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
|
||||
Assert.assertTrue(reader instanceof HFileReaderImpl);
|
||||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
waitBucketCacheFlushed(defaultBC);
|
||||
Assert.assertTrue(block1.getBlockType().isData());
|
||||
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
|
||||
|
||||
HFileScanner scanner = reader.getScanner(true, true, false);
|
||||
BlockWithScanInfo scanInfo = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE);
|
||||
BlockWithScanInfo scanInfo2 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE);
|
||||
HFileBlock block = scanInfo.getHFileBlock();
|
||||
HFileBlock block2 = scanInfo2.getHFileBlock();
|
||||
// One refCnt for blockCache and the other refCnt for RPC path.
|
||||
Assert.assertEquals(block.refCnt(), 2);
|
||||
Assert.assertEquals(block2.refCnt(), 2);
|
||||
Assert.assertFalse(block == block2);
|
||||
HFileBlock block2 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
waitBucketCacheFlushed(defaultBC);
|
||||
Assert.assertTrue(block2.getBlockType().isData());
|
||||
Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
|
||||
// Only one refCnt for RPC path.
|
||||
Assert.assertEquals(block1.refCnt(), 1);
|
||||
Assert.assertEquals(block2.refCnt(), 1);
|
||||
Assert.assertFalse(block1 == block2);
|
||||
|
||||
scanner.seekTo(firstCell);
|
||||
Assert.assertEquals(block.refCnt(), 3);
|
||||
scanner.seekTo(firstCell);
|
||||
curBlock = scanner.curBlock;
|
||||
Assert.assertEquals(curBlock.refCnt(), 2);
|
||||
|
||||
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
|
||||
// refCnt should be unchanged.
|
||||
scanner.seekTo(firstCell);
|
||||
Assert.assertEquals(block.refCnt(), 3);
|
||||
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
|
||||
// refCnt should be unchanged.
|
||||
scanner.seekTo(firstCell);
|
||||
Assert.assertTrue(curBlock == scanner.curBlock);
|
||||
Assert.assertEquals(curBlock.refCnt(), 2);
|
||||
prevBlock = curBlock;
|
||||
|
||||
scanner.seekTo(secondCell);
|
||||
Assert.assertEquals(block.refCnt(), 3);
|
||||
Assert.assertEquals(block2.refCnt(), 3);
|
||||
scanner.seekTo(secondCell);
|
||||
curBlock = scanner.curBlock;
|
||||
Assert.assertEquals(prevBlock.refCnt(), 2);
|
||||
Assert.assertEquals(curBlock.refCnt(), 2);
|
||||
|
||||
// After shipped, the block will be release, but block2 is still referenced by the curBlock.
|
||||
scanner.shipped();
|
||||
Assert.assertEquals(block.refCnt(), 2);
|
||||
Assert.assertEquals(block2.refCnt(), 3);
|
||||
// After shipped, the prevBlock will be release, but curBlock is still referenced by the
|
||||
// curBlock.
|
||||
scanner.shipped();
|
||||
Assert.assertEquals(prevBlock.refCnt(), 1);
|
||||
Assert.assertEquals(curBlock.refCnt(), 2);
|
||||
|
||||
// Try to ship again, though with nothing to client.
|
||||
scanner.shipped();
|
||||
Assert.assertEquals(block.refCnt(), 2);
|
||||
Assert.assertEquals(block2.refCnt(), 3);
|
||||
// Try to ship again, though with nothing to client.
|
||||
scanner.shipped();
|
||||
Assert.assertEquals(prevBlock.refCnt(), 1);
|
||||
Assert.assertEquals(curBlock.refCnt(), 2);
|
||||
|
||||
// The curBlock(block2) will also be released.
|
||||
scanner.close();
|
||||
Assert.assertEquals(block2.refCnt(), 2);
|
||||
// The curBlock will also be released.
|
||||
scanner.close();
|
||||
Assert.assertEquals(curBlock.refCnt(), 1);
|
||||
|
||||
// Finish the block & block2 RPC path
|
||||
block.release();
|
||||
block2.release();
|
||||
Assert.assertEquals(block.refCnt(), 1);
|
||||
Assert.assertEquals(block2.refCnt(), 1);
|
||||
// Finish the block & block2 RPC path
|
||||
Assert.assertTrue(block1.release());
|
||||
Assert.assertTrue(block2.release());
|
||||
|
||||
// Evict the LRUBlockCache
|
||||
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
|
||||
Assert.assertEquals(block.refCnt(), 0);
|
||||
Assert.assertEquals(block2.refCnt(), 0);
|
||||
// Evict the LRUBlockCache
|
||||
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
|
||||
Assert.assertEquals(prevBlock.refCnt(), 0);
|
||||
Assert.assertEquals(curBlock.refCnt(), 0);
|
||||
|
||||
int count = 0;
|
||||
Assert.assertTrue(scanner.seekTo());
|
||||
++count;
|
||||
while (scanner.next()) {
|
||||
count++;
|
||||
}
|
||||
assertEquals(cellCount, count);
|
||||
} finally {
|
||||
fs.delete(dir, true);
|
||||
int count = 0;
|
||||
Assert.assertTrue(scanner.seekTo());
|
||||
++count;
|
||||
while (scanner.next()) {
|
||||
count++;
|
||||
}
|
||||
assertEquals(CELL_COUNT, count);
|
||||
}
|
||||
|
||||
/**
|
||||
* See HBASE-22480
|
||||
*/
|
||||
@Test
|
||||
public void testSeekBefore() throws IOException {
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
Path dir = UTIL.getDataTestDir("testSeekBefore");
|
||||
FileSystem fs = dir.getFileSystem(conf);
|
||||
try {
|
||||
String hfileName = "testSeekBefore_hfile_0_" + System.currentTimeMillis();
|
||||
Path hfilePath = new Path(dir, hfileName);
|
||||
int cellCount = 1000;
|
||||
LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
|
||||
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, cellCount);
|
||||
public void testSeekBefore() throws Exception {
|
||||
HFileBlock curBlock, prevBlock;
|
||||
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
|
||||
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
|
||||
Assert.assertNotNull(defaultBC);
|
||||
Assert.assertTrue(cacheConfig.isCombinedBlockCache());
|
||||
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
|
||||
Assert.assertTrue(reader instanceof HFileReaderImpl);
|
||||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
|
||||
Assert.assertNotNull(defaultBC);
|
||||
HFile.Reader reader =
|
||||
HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
|
||||
Assert.assertTrue(reader instanceof HFileReaderImpl);
|
||||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
Assert.assertTrue(block1.getBlockType().isData());
|
||||
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
|
||||
HFileBlock block2 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
Assert.assertTrue(block2.getBlockType().isData());
|
||||
Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
|
||||
// Wait until flushed to IOEngine;
|
||||
waitBucketCacheFlushed(defaultBC);
|
||||
// One RPC reference path.
|
||||
Assert.assertEquals(block1.refCnt(), 1);
|
||||
Assert.assertEquals(block2.refCnt(), 1);
|
||||
|
||||
HFileScanner scanner = reader.getScanner(true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
HFileBlock block2 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
Assert.assertEquals(block1.refCnt(), 2);
|
||||
Assert.assertEquals(block2.refCnt(), 2);
|
||||
// Let the curBlock refer to block2.
|
||||
scanner.seekTo(secondCell);
|
||||
curBlock = scanner.curBlock;
|
||||
Assert.assertFalse(curBlock == block2);
|
||||
Assert.assertEquals(1, block2.refCnt());
|
||||
Assert.assertEquals(2, curBlock.refCnt());
|
||||
prevBlock = scanner.curBlock;
|
||||
|
||||
// Let the curBlock refer to block2.
|
||||
scanner.seekTo(secondCell);
|
||||
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block2);
|
||||
Assert.assertEquals(3, block2.refCnt());
|
||||
// Release the block1, no other reference.
|
||||
Assert.assertTrue(block1.release());
|
||||
Assert.assertEquals(0, block1.refCnt());
|
||||
// Release the block2, no other reference.
|
||||
Assert.assertTrue(block2.release());
|
||||
Assert.assertEquals(0, block2.refCnt());
|
||||
|
||||
// Release the block1, only one reference: blockCache.
|
||||
Assert.assertFalse(block1.release());
|
||||
Assert.assertEquals(1, block1.refCnt());
|
||||
// Release the block2, so the remain references are: 1. scanner; 2. blockCache.
|
||||
Assert.assertFalse(block2.release());
|
||||
Assert.assertEquals(2, block2.refCnt());
|
||||
// Do the seekBefore: the newBlock will be the previous block of curBlock.
|
||||
Assert.assertTrue(scanner.seekBefore(secondCell));
|
||||
Assert.assertEquals(scanner.prevBlocks.size(), 1);
|
||||
Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
|
||||
curBlock = scanner.curBlock;
|
||||
// the curBlock is read from IOEngine, so a different block.
|
||||
Assert.assertFalse(curBlock == block1);
|
||||
// Two reference for curBlock: 1. scanner; 2. blockCache.
|
||||
Assert.assertEquals(2, curBlock.refCnt());
|
||||
// Reference count of prevBlock must be unchanged because we haven't shipped.
|
||||
Assert.assertEquals(2, prevBlock.refCnt());
|
||||
|
||||
// Do the seekBefore: the newBlock will be the previous block of curBlock.
|
||||
Assert.assertTrue(scanner.seekBefore(secondCell));
|
||||
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
|
||||
// Two reference for block1: 1. scanner; 2. blockCache.
|
||||
Assert.assertEquals(2, block1.refCnt());
|
||||
// Reference count of block2 must be unchanged because we haven't shipped.
|
||||
Assert.assertEquals(2, block2.refCnt());
|
||||
// Do the shipped
|
||||
scanner.shipped();
|
||||
Assert.assertEquals(scanner.prevBlocks.size(), 0);
|
||||
Assert.assertNotNull(scanner.curBlock);
|
||||
Assert.assertEquals(2, curBlock.refCnt());
|
||||
Assert.assertEquals(1, prevBlock.refCnt());
|
||||
|
||||
// Do the shipped
|
||||
scanner.shipped();
|
||||
Assert.assertEquals(2, block1.refCnt());
|
||||
Assert.assertEquals(1, block2.refCnt());
|
||||
// Do the close
|
||||
scanner.close();
|
||||
Assert.assertNull(scanner.curBlock);
|
||||
Assert.assertEquals(1, curBlock.refCnt());
|
||||
Assert.assertEquals(1, prevBlock.refCnt());
|
||||
|
||||
// Do the close
|
||||
scanner.close();
|
||||
Assert.assertEquals(1, block1.refCnt());
|
||||
Assert.assertEquals(1, block2.refCnt());
|
||||
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
|
||||
Assert.assertEquals(0, curBlock.refCnt());
|
||||
Assert.assertEquals(0, prevBlock.refCnt());
|
||||
|
||||
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
|
||||
Assert.assertEquals(0, block1.refCnt());
|
||||
Assert.assertEquals(0, block2.refCnt());
|
||||
// Reload the block1 again.
|
||||
block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
// Wait until flushed to IOEngine;
|
||||
waitBucketCacheFlushed(defaultBC);
|
||||
Assert.assertTrue(block1.getBlockType().isData());
|
||||
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
|
||||
Assert.assertTrue(block1.release());
|
||||
Assert.assertEquals(0, block1.refCnt());
|
||||
// Re-seek to the begin.
|
||||
Assert.assertTrue(scanner.seekTo());
|
||||
curBlock = scanner.curBlock;
|
||||
Assert.assertFalse(curBlock == block1);
|
||||
Assert.assertEquals(2, curBlock.refCnt());
|
||||
// Return false because firstCell <= c[0]
|
||||
Assert.assertFalse(scanner.seekBefore(firstCell));
|
||||
// The block1 shouldn't be released because we still don't do the shipped or close.
|
||||
Assert.assertEquals(2, curBlock.refCnt());
|
||||
|
||||
// Reload the block1 again.
|
||||
block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
Assert.assertFalse(block1.release());
|
||||
Assert.assertEquals(1, block1.refCnt());
|
||||
// Re-seek to the begin.
|
||||
Assert.assertTrue(scanner.seekTo());
|
||||
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
|
||||
Assert.assertEquals(2, block1.refCnt());
|
||||
// Return false because firstCell <= c[0]
|
||||
Assert.assertFalse(scanner.seekBefore(firstCell));
|
||||
// The block1 shouldn't be released because we still don't do the shipped or close.
|
||||
Assert.assertEquals(2, block1.refCnt());
|
||||
|
||||
scanner.close();
|
||||
Assert.assertEquals(1, block1.refCnt());
|
||||
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 1);
|
||||
Assert.assertEquals(0, block1.refCnt());
|
||||
} finally {
|
||||
fs.delete(dir, true);
|
||||
}
|
||||
scanner.close();
|
||||
Assert.assertEquals(1, curBlock.refCnt());
|
||||
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
|
||||
Assert.assertEquals(0, curBlock.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -298,4 +370,56 @@ public class TestHFileScannerImplReferenceCount {
|
|||
public void testDataBlockEncodingAndCompression() throws Exception {
|
||||
testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithLruBlockCache() throws Exception {
|
||||
HFileBlock curBlock;
|
||||
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
|
||||
// Set LruBlockCache
|
||||
conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
|
||||
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
|
||||
Assert.assertNotNull(defaultBC);
|
||||
Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
|
||||
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
|
||||
Assert.assertTrue(reader instanceof HFileReaderImpl);
|
||||
// We've build a HFile tree with index = 16.
|
||||
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
|
||||
|
||||
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
|
||||
HFileBlock block1 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
Assert.assertTrue(block1.getBlockType().isData());
|
||||
Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
|
||||
HFileBlock block2 = reader.getDataBlockIndexReader()
|
||||
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
|
||||
.getHFileBlock();
|
||||
Assert.assertTrue(block2.getBlockType().isData());
|
||||
Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
|
||||
// One RPC reference path.
|
||||
Assert.assertEquals(block1.refCnt(), 0);
|
||||
Assert.assertEquals(block2.refCnt(), 0);
|
||||
|
||||
scanner.seekTo(firstCell);
|
||||
curBlock = scanner.curBlock;
|
||||
Assert.assertTrue(curBlock == block1);
|
||||
Assert.assertEquals(curBlock.refCnt(), 0);
|
||||
Assert.assertTrue(scanner.prevBlocks.isEmpty());
|
||||
|
||||
// Switch to next block
|
||||
scanner.seekTo(secondCell);
|
||||
curBlock = scanner.curBlock;
|
||||
Assert.assertTrue(curBlock == block2);
|
||||
Assert.assertEquals(curBlock.refCnt(), 0);
|
||||
Assert.assertEquals(curBlock.retain().refCnt(), 0);
|
||||
// Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
|
||||
// in prevBlocks.
|
||||
Assert.assertTrue(scanner.prevBlocks.isEmpty());
|
||||
|
||||
// close the scanner
|
||||
scanner.close();
|
||||
Assert.assertNull(scanner.curBlock);
|
||||
Assert.assertTrue(scanner.prevBlocks.isEmpty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Waiter;
|
|||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -820,10 +821,10 @@ public class TestLruBlockCache {
|
|||
byte[] byteArr = new byte[length];
|
||||
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
|
||||
|
||||
LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
|
||||
(int)Math.ceil(1.2*maxSize/blockSize),
|
||||
|
@ -964,7 +965,8 @@ public class TestLruBlockCache {
|
|||
HFileContext meta = new HFileContextBuilder().build();
|
||||
BlockCacheKey key = new BlockCacheKey("key1", 0);
|
||||
HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
|
||||
ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1, 52, -1, meta,
|
||||
HEAP);
|
||||
AtomicBoolean err1 = new AtomicBoolean(false);
|
||||
Thread t1 = new Thread(() -> {
|
||||
for (int i = 0; i < 10000 && !err1.get(); i++) {
|
||||
|
|
|
@ -203,7 +203,7 @@ public class TestBucketCache {
|
|||
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
|
||||
}
|
||||
|
||||
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
|
||||
public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
|
||||
throws InterruptedException {
|
||||
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
|
||||
Thread.sleep(100);
|
||||
|
@ -211,6 +211,13 @@ public class TestBucketCache {
|
|||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
|
||||
while (!cache.ramCache.isEmpty()) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
|
||||
// threads will flush it to the bucket and put reference entry in backingMap.
|
||||
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
|
||||
|
@ -430,10 +437,10 @@ public class TestBucketCache {
|
|||
ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf1,
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf2,
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
|
||||
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
|
||||
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
|
||||
|
||||
BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
|
||||
ByteBuffer actualBuffer = ByteBuffer.allocate(length);
|
||||
|
@ -492,10 +499,10 @@ public class TestBucketCache {
|
|||
RAMCache cache = new RAMCache();
|
||||
BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
|
||||
BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
|
||||
HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
-1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
-1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
|
||||
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
|
||||
|
||||
|
@ -527,8 +534,8 @@ public class TestBucketCache {
|
|||
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
||||
ByteBuffer buf = ByteBuffer.allocate(length);
|
||||
HFileContext meta = new HFileContextBuilder().build();
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
|
||||
offset, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
|
||||
// initialize an mocked ioengine.
|
||||
IOEngine ioEngine = Mockito.mock(IOEngine.class);
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -69,7 +70,7 @@ public class TestBucketCacheRefCnt {
|
|||
}
|
||||
|
||||
private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
|
||||
return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.allocate(size),
|
||||
return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
|
||||
HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.Assert;
|
||||
|
@ -57,9 +58,9 @@ public class TestRAMCache {
|
|||
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
|
||||
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
|
||||
HFileContext fileContext, ByteBuffAllocator allocator) {
|
||||
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
|
||||
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
|
||||
allocator);
|
||||
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
|
||||
ByteBuff.wrap(b), fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader,
|
||||
fileContext, allocator);
|
||||
}
|
||||
|
||||
public void setLatch(CountDownLatch latch) {
|
||||
|
|
Loading…
Reference in New Issue