HBASE-12295 Prevent block eviction under us if reads are in progress from

the BBs (Ram)
This commit is contained in:
ramkrishna 2015-07-21 21:15:32 +05:30
parent 3b6db26863
commit ccb22bd80d
39 changed files with 2380 additions and 355 deletions

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A cell implementing this interface would mean that the memory area backing this cell will refer
* to a memory area that could be part of a larger common memory area used by the
* RegionServer. If an exclusive instance is required, use the {@link #cloneToCell()} to have the
* contents of the cell copied to an exclusive memory area.
*/
@InterfaceAudience.Private
public interface ShareableMemory {
/**
* Does a deep copy of the contents to a new memory area and
* returns it in the form of a cell.
* @return Cell the deep cloned cell
*/
public Cell cloneToCell();
}

View File

@ -283,7 +283,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
/**
* Copies only the key part of the keybuffer by doing a deep copy and passes the
* seeker state members for taking a clone.
* Note that the value byte[] part is still pointing to the currentBuffer and the
* Note that the value byte[] part is still pointing to the currentBuffer and
* represented by the valueOffset and valueLength
*/
// We return this as a Cell to the upper layers of read flow and might try setting a new SeqId

View File

@ -56,6 +56,7 @@ public class HFileContext implements HeapSize, Cloneable {
/** Encryption algorithm and key used */
private Encryption.Context cryptoContext = Encryption.Context.NONE;
private long fileCreateTime;
private String hfileName;
//Empty constructor. Go with setters
public HFileContext() {
@ -77,12 +78,13 @@ public class HFileContext implements HeapSize, Cloneable {
this.encoding = context.encoding;
this.cryptoContext = context.cryptoContext;
this.fileCreateTime = context.fileCreateTime;
this.hfileName = context.hfileName;
}
public HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags,
HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags,
Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType,
int bytesPerChecksum, int blockSize, DataBlockEncoding encoding,
Encryption.Context cryptoContext, long fileCreateTime) {
Encryption.Context cryptoContext, long fileCreateTime, String hfileName) {
this.usesHBaseChecksum = useHBaseChecksum;
this.includesMvcc = includesMvcc;
this.includesTags = includesTags;
@ -96,6 +98,7 @@ public class HFileContext implements HeapSize, Cloneable {
}
this.cryptoContext = cryptoContext;
this.fileCreateTime = fileCreateTime;
this.hfileName = hfileName;
}
/**
@ -119,10 +122,6 @@ public class HFileContext implements HeapSize, Cloneable {
return compressAlgo;
}
public void setCompression(Compression.Algorithm compressAlgo) {
this.compressAlgo = compressAlgo;
}
public boolean isUseHBaseChecksum() {
return usesHBaseChecksum;
}
@ -175,10 +174,6 @@ public class HFileContext implements HeapSize, Cloneable {
return encoding;
}
public void setDataBlockEncoding(DataBlockEncoding encoding) {
this.encoding = encoding;
}
public Encryption.Context getEncryptionContext() {
return cryptoContext;
}
@ -187,6 +182,10 @@ public class HFileContext implements HeapSize, Cloneable {
this.cryptoContext = cryptoContext;
}
public String getHFileName() {
return this.hfileName;
}
/**
* HeapSize implementation
* NOTE : The heapsize should be altered as and when new state variable are added
@ -196,11 +195,14 @@ public class HFileContext implements HeapSize, Cloneable {
public long heapSize() {
long size = ClassSize.align(ClassSize.OBJECT +
// Algorithm reference, encodingon, checksumtype, Encryption.Context reference
4 * ClassSize.REFERENCE +
5 * ClassSize.REFERENCE +
2 * Bytes.SIZEOF_INT +
// usesHBaseChecksum, includesMvcc, includesTags and compressTags
4 * Bytes.SIZEOF_BOOLEAN +
Bytes.SIZEOF_LONG);
if (this.hfileName != null) {
size += ClassSize.STRING + this.hfileName.length();
}
return size;
}
@ -227,6 +229,10 @@ public class HFileContext implements HeapSize, Cloneable {
sb.append(" compressAlgo="); sb.append(compressAlgo);
sb.append(" compressTags="); sb.append(compressTags);
sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]");
if (hfileName != null) {
sb.append(" name=");
sb.append(hfileName);
}
sb.append(" ]");
return sb.toString();
}

View File

@ -53,6 +53,8 @@ public class HFileContextBuilder {
private Encryption.Context cryptoContext = Encryption.Context.NONE;
private long fileCreateTime = 0;
private String hfileName = null;
public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) {
this.usesHBaseChecksum = useHBaseCheckSum;
return this;
@ -108,9 +110,14 @@ public class HFileContextBuilder {
return this;
}
public HFileContextBuilder withHFileName(String name) {
this.hfileName = name;
return this;
}
public HFileContext build() {
return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression,
compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext,
fileCreateTime);
fileCreateTime, hfileName);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.util.Iterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
/**
* Block cache interface. Anything that implements the {@link Cacheable}
@ -116,4 +117,16 @@ public interface BlockCache extends Iterable<CachedBlock> {
* @return The list of sub blockcaches that make up this one; returns null if no sub caches.
*/
BlockCache [] getBlockCaches();
/**
* Called when the scanner using the block decides to return the block once its usage
* is over.
* This API should be called after the block is used, failing to do so may have adverse effects
* by preventing the blocks from being evicted because of which it will prevent new hot blocks
* from getting added to the block cache. The implementation of the BlockCache will decide
* on what to be done with the block based on the memory type of the block's {@link MemoryType}.
* @param cacheKey the cache key of the block
* @param block the hfileblock to be returned
*/
void returnBlock(BlockCacheKey cacheKey, Cacheable block);
}

View File

@ -60,4 +60,19 @@ public interface Cacheable extends HeapSize {
* @return the block type of this cached HFile block
*/
BlockType getBlockType();
/**
* @return the {@code MemoryType} of this Cacheable
*/
MemoryType getMemoryType();
/**
* SHARED means when this Cacheable is read back from cache it refers to the same memory area as
* used by the cache for caching it.
* EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an
* exclusive memory area of this Cacheable.
*/
public static enum MemoryType {
SHARED, EXCLUSIVE;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
/**
@ -36,14 +37,14 @@ public interface CacheableDeserializer<T extends Cacheable> {
T deserialize(ByteBuff b) throws IOException;
/**
*
* @param b
* @param reuse true if Cacheable object can use the given buffer as its
* content
* @param memType the {@link MemoryType} of the buffer
* @return T the deserialized object.
* @throws IOException
*/
T deserialize(ByteBuff b, boolean reuse) throws IOException;
T deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException;
/**
* Get the identifier of this deserialiser. Identifier is unique for each

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import com.google.common.annotations.VisibleForTesting;
/**
* CombinedBlockCache is an abstraction layer that combines
@ -219,4 +221,16 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
public void setMaxSize(long size) {
this.lruCache.setMaxSize(size);
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
// A noop
this.lruCache.returnBlock(cacheKey, block);
this.l2Cache.returnBlock(cacheKey, block);
}
@VisibleForTesting
public int getRefCount(BlockCacheKey cacheKey) {
return ((BucketCache) this.l2Cache).getRefCount(cacheKey);
}
}

View File

@ -119,11 +119,15 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
"Failed to load Bloom block for key "
+ Bytes.toStringBinary(key, keyOffset, keyLength), ex);
}
ByteBuff bloomBuf = bloomBlock.getBufferReadOnly();
result = BloomFilterUtil.contains(key, keyOffset, keyLength,
bloomBuf, bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
try {
ByteBuff bloomBuf = bloomBlock.getBufferReadOnly();
result =
BloomFilterUtil.contains(key, keyOffset, keyLength, bloomBuf, bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
} finally {
// After the use return back the block if it was served from a cache.
reader.returnBlock(bloomBlock);
}
}
if (numQueriesPerChunk != null && block >= 0) {

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@ -374,6 +373,12 @@ public class HFile {
final boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding)
throws IOException;
/**
* Return the given block back to the cache, if it was obtained from cache.
* @param block Block to be returned.
*/
void returnBlock(HFileBlock block);
}
/** An interface used by clients to open and iterate an {@link HFile}. */
@ -389,7 +394,7 @@ public class HFile {
HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction);
ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
Map<byte[], byte[]> loadFileInfo() throws IOException;

View File

@ -121,7 +121,8 @@ public class HFileBlock implements Cacheable {
static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
public HFileBlock deserialize(ByteBuff buf, boolean reuse) throws IOException{
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
throws IOException {
buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
ByteBuff newByteBuffer;
if (reuse) {
@ -135,7 +136,7 @@ public class HFileBlock implements Cacheable {
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
boolean usesChecksum = buf.get() == (byte)1;
HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType);
hFileBlock.offset = buf.getLong();
hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
if (hFileBlock.hasNextBlockHeader()) {
@ -152,7 +153,7 @@ public class HFileBlock implements Cacheable {
@Override
public HFileBlock deserialize(ByteBuff b) throws IOException {
// Used only in tests
return deserialize(b, false);
return deserialize(b, false, MemoryType.EXCLUSIVE);
}
};
private static final int deserializerIdentifier;
@ -198,6 +199,8 @@ public class HFileBlock implements Cacheable {
*/
private int nextBlockOnDiskSizeWithHeader = -1;
private MemoryType memType = MemoryType.EXCLUSIVE;
/**
* Creates a new {@link HFile} block from the given fields. This constructor
* is mostly used when the block data has already been read and uncompressed,
@ -255,15 +258,24 @@ public class HFileBlock implements Cacheable {
HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
this(new SingleByteBuff(b), usesHBaseChecksum);
}
/**
* Creates a block from an existing buffer starting with a header. Rewinds
* and takes ownership of the buffer. By definition of rewind, ignores the
* buffer position, but if you slice the buffer beforehand, it will rewind
* to that point. The reason this has a minorNumber and not a majorNumber is
* because majorNumbers indicate the format of a HFile whereas minorNumbers
* indicate the format inside a HFileBlock.
* to that point.
*/
HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException {
this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE);
}
/**
* Creates a block from an existing buffer starting with a header. Rewinds
* and takes ownership of the buffer. By definition of rewind, ignores the
* buffer position, but if you slice the buffer beforehand, it will rewind
* to that point.
*/
HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException {
b.rewind();
blockType = BlockType.read(b);
onDiskSizeWithoutHeader = b.getInt();
@ -282,6 +294,7 @@ public class HFileBlock implements Cacheable {
HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
}
this.fileContext = contextBuilder.build();
this.memType = memType;
buf = b;
buf.rewind();
}
@ -650,8 +663,8 @@ public class HFileBlock implements Cacheable {
public long heapSize() {
long size = ClassSize.align(
ClassSize.OBJECT +
// Block type, multi byte buffer and meta references
3 * ClassSize.REFERENCE +
// Block type, multi byte buffer, MemoryType and meta references
4 * ClassSize.REFERENCE +
// On-disk size, uncompressed size, and next block's on-disk size
// bytePerChecksum and onDiskDataSize
4 * Bytes.SIZEOF_INT +
@ -1885,6 +1898,11 @@ public class HFileBlock implements Cacheable {
return this.fileContext;
}
@Override
public MemoryType getMemoryType() {
return this.memType;
}
/**
* Convert the contents of the block header into a human readable string.
* This is mostly helpful for debugging. This assumes that the block

View File

@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.Cell;
@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
@ -295,78 +295,90 @@ public class HFileBlockIndex {
int lookupLevel = 1; // How many levels deep we are in our lookup.
int index = -1;
HFileBlock block;
HFileBlock block = null;
boolean dataBlock = false;
KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
while (true) {
if (currentBlock != null && currentBlock.getOffset() == currentOffset)
{
// Avoid reading the same block again, even with caching turned off.
// This is crucial for compaction-type workload which might have
// caching turned off. This is like a one-block cache inside the
// scanner.
block = currentBlock;
} else {
// Call HFile's caching block reader API. We always cache index
// blocks, otherwise we might get terrible performance.
boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
BlockType expectedBlockType;
if (lookupLevel < searchTreeLevel - 1) {
expectedBlockType = BlockType.INTERMEDIATE_INDEX;
} else if (lookupLevel == searchTreeLevel - 1) {
expectedBlockType = BlockType.LEAF_INDEX;
try {
if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
// Avoid reading the same block again, even with caching turned off.
// This is crucial for compaction-type workload which might have
// caching turned off. This is like a one-block cache inside the
// scanner.
block = currentBlock;
} else {
// this also accounts for ENCODED_DATA
expectedBlockType = BlockType.DATA;
// Call HFile's caching block reader API. We always cache index
// blocks, otherwise we might get terrible performance.
boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
BlockType expectedBlockType;
if (lookupLevel < searchTreeLevel - 1) {
expectedBlockType = BlockType.INTERMEDIATE_INDEX;
} else if (lookupLevel == searchTreeLevel - 1) {
expectedBlockType = BlockType.LEAF_INDEX;
} else {
// this also accounts for ENCODED_DATA
expectedBlockType = BlockType.DATA;
}
block =
cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
}
block = cachingBlockReader.readBlock(currentOffset,
currentOnDiskSize, shouldCache, pread, isCompaction, true,
expectedBlockType, expectedDataBlockEncoding);
}
if (block == null) {
throw new IOException("Failed to read block at offset " +
currentOffset + ", onDiskSize=" + currentOnDiskSize);
}
if (block == null) {
throw new IOException("Failed to read block at offset " + currentOffset
+ ", onDiskSize=" + currentOnDiskSize);
}
// Found a data block, break the loop and check our level in the tree.
if (block.getBlockType().isData()) {
break;
}
// Found a data block, break the loop and check our level in the tree.
if (block.getBlockType().isData()) {
dataBlock = true;
break;
}
// Not a data block. This must be a leaf-level or intermediate-level
// index block. We don't allow going deeper than searchTreeLevel.
if (++lookupLevel > searchTreeLevel) {
throw new IOException("Search Tree Level overflow: lookupLevel="+
lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
}
// Not a data block. This must be a leaf-level or intermediate-level
// index block. We don't allow going deeper than searchTreeLevel.
if (++lookupLevel > searchTreeLevel) {
throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
+ ", searchTreeLevel=" + searchTreeLevel);
}
// Locate the entry corresponding to the given key in the non-root
// (leaf or intermediate-level) index block.
ByteBuff buffer = block.getBufferWithoutHeader();
index = locateNonRootIndexEntry(buffer, key, comparator);
if (index == -1) {
// This has to be changed
// For now change this to key value
KeyValue kv = KeyValueUtil.ensureKeyValue(key);
throw new IOException("The key "
+ Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
+ " is before the" + " first key of the non-root index block "
+ block);
}
// Locate the entry corresponding to the given key in the non-root
// (leaf or intermediate-level) index block.
ByteBuff buffer = block.getBufferWithoutHeader();
index = locateNonRootIndexEntry(buffer, key, comparator);
if (index == -1) {
// This has to be changed
// For now change this to key value
KeyValue kv = KeyValueUtil.ensureKeyValue(key);
throw new IOException("The key "
+ Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
+ " is before the" + " first key of the non-root index block " + block);
}
currentOffset = buffer.getLong();
currentOnDiskSize = buffer.getInt();
currentOffset = buffer.getLong();
currentOnDiskSize = buffer.getInt();
// Only update next indexed key if there is a next indexed key in the current level
byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
if (nonRootIndexedKey != null) {
tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
nextIndexedKey = tmpNextIndexKV;
// Only update next indexed key if there is a next indexed key in the current level
byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
if (nonRootIndexedKey != null) {
tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
nextIndexedKey = tmpNextIndexKV;
}
} finally {
if (!dataBlock) {
// Return the block immediately if it is not the
// data block
cachingBlockReader.returnBlock(block);
}
}
}
if (lookupLevel != searchTreeLevel) {
assert dataBlock == true;
// Though we have retrieved a data block we have found an issue
// in the retrieved data block. Hence returned the block so that
// the ref count can be decremented
cachingBlockReader.returnBlock(block);
throw new IOException("Reached a data block at level " + lookupLevel +
" but the number of levels is " + searchTreeLevel);
}
@ -396,16 +408,19 @@ public class HFileBlockIndex {
HFileBlock midLeafBlock = cachingBlockReader.readBlock(
midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
BlockType.LEAF_INDEX, null);
ByteBuff b = midLeafBlock.getBufferWithoutHeader();
int numDataBlocks = b.getIntAfterPosition(0);
int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
keyRelOffset;
int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
+ SECONDARY_INDEX_ENTRY_OVERHEAD;
byte[] bytes = b.toBytes(keyOffset, keyLen);
targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
try {
ByteBuff b = midLeafBlock.getBufferWithoutHeader();
int numDataBlocks = b.getIntAfterPosition(0);
int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
int keyOffset =
Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
+ SECONDARY_INDEX_ENTRY_OVERHEAD;
byte[] bytes = b.toBytes(keyOffset, keyLen);
targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
} finally {
cachingBlockReader.returnBlock(midLeafBlock);
}
} else {
// The middle of the root-level index.
targetMidKey = blockKeys[rootCount / 2];

View File

@ -34,10 +34,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.security.EncryptionUtil;
@ -256,6 +258,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
null, null);
// Need not update the current block. Ideally here the readBlock won't find the
// block in cache. We call this readBlock so that block data is read from FS and
// cached in BC. So there is no reference count increment that happens here.
// The return will ideally be a noop because the block is not of MemoryType SHARED.
returnBlock(block);
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
}
@ -337,6 +344,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return fileSize;
}
@Override
public void returnBlock(HFileBlock block) {
BlockCache blockCache = this.cacheConf.getBlockCache();
if (blockCache != null && block != null) {
BlockCacheKey cacheKey = new BlockCacheKey(this.getFileContext().getHFileName(),
block.getOffset());
blockCache.returnBlock(cacheKey, block);
}
}
/**
* @return the first key in the file. May be null if file has no entries. Note
* that this is not the first row key, but rather the byte form of the
@ -449,7 +465,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
protected final HFile.Reader reader;
private int currTagsLen;
private KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
protected HFileBlock block;
// A pair for reusing in blockSeek() so that we don't garbage lot of objects
final Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
@ -461,6 +476,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
*/
protected Cell nextIndexedKey;
// Current block being used
protected HFileBlock curBlock;
// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<HFileBlock>();
public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
@ -470,6 +489,41 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
this.isCompaction = isCompaction;
}
void updateCurrBlockRef(HFileBlock block) {
if (block != null && this.curBlock != null &&
block.getOffset() == this.curBlock.getOffset()) {
return;
}
if (this.curBlock != null) {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
}
void reset() {
if (this.curBlock != null) {
this.prevBlocks.add(this.curBlock);
}
this.curBlock = null;
}
private void returnBlockToCache(HFileBlock block) {
if (LOG.isTraceEnabled()) {
LOG.trace("Returning the block : " + block);
}
this.reader.returnBlock(block);
}
private void returnBlocks(boolean returnAll) {
for (int i = 0; i < this.prevBlocks.size(); i++) {
returnBlockToCache(this.prevBlocks.get(i));
}
this.prevBlocks.clear();
if (returnAll && this.curBlock != null) {
returnBlockToCache(this.curBlock);
this.curBlock = null;
}
}
@Override
public boolean isSeeked(){
return blockBuffer != null;
@ -498,6 +552,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return kvBufSize;
}
@Override
public void close() {
this.returnBlocks(true);
}
protected int getNextCellStartPosition() {
int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
+ currMemstoreTSLen;
@ -536,7 +595,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
private final void checkTagsLen() {
if (checkLen(this.currTagsLen)) {
throw new IllegalStateException("Invalid currTagsLen " + this.currTagsLen +
". Block offset: " + block.getOffset() + ", block length: " + this.blockBuffer.limit() +
". Block offset: " + curBlock.getOffset() + ", block length: " +
this.blockBuffer.limit() +
", position: " + this.blockBuffer.position() + " (without header).");
}
}
@ -610,7 +670,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|| vlen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid klen " + klen + " or vlen "
+ vlen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ blockBuffer.position() + " (without header).");
}
offsetFromPos += Bytes.SIZEOF_LONG;
@ -626,7 +686,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
^ (blockBuffer.getByteAfterPosition(offsetFromPos + 1) & 0xff);
if (tlen < 0 || tlen > blockBuffer.limit()) {
throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
+ block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+ blockBuffer.position() + " (without header).");
}
// add the two bytes read for the tags.
@ -641,8 +701,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
if (lastKeyValueSize < 0) {
throw new IllegalStateException("blockSeek with seekBefore "
+ "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key)
+ ", blockOffset=" + block.getOffset() + ", onDiskSize="
+ block.getOnDiskSizeWithHeader());
+ ", blockOffset=" + curBlock.getOffset() + ", onDiskSize="
+ curBlock.getOnDiskSizeWithHeader());
}
blockBuffer.moveBack(lastKeyValueSize);
readKeyValueLen();
@ -709,8 +769,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// smaller than
// the next indexed key or the current data block is the last data
// block.
return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key,
false);
}
}
}
// Don't rewind on a reseek operation, because reseek implies that we are
@ -734,10 +796,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
*/
public int seekTo(Cell key, boolean rewind) throws IOException {
HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock,
cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
// This happens if the key e.g. falls before the beginning of the file.
// This happens if the key e.g. falls before the beginning of the
// file.
return -1;
}
return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
@ -746,7 +809,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public boolean seekBefore(Cell key) throws IOException {
HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock,
cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction));
if (seekToBlock == null) {
return false;
@ -761,6 +824,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return false;
}
// The first key in the current block 'seekToBlock' is greater than the given
// seekBefore key. We will go ahead by reading the next block that satisfies the
// given key. Return the current block before reading the next one.
reader.returnBlock(seekToBlock);
// It is important that we compute and pass onDiskSize to the block
// reader so that it does not have to read the header separately to
// figure out the size.
@ -783,28 +850,33 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
*/
protected HFileBlock readNextDataBlock() throws IOException {
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
if (block == null)
if (curBlock == null)
return null;
HFileBlock curBlock = block;
HFileBlock block = this.curBlock;
do {
if (curBlock.getOffset() >= lastDataBlockOffset)
if (block.getOffset() >= lastDataBlockOffset)
return null;
if (curBlock.getOffset() < 0) {
if (block.getOffset() < 0) {
throw new IOException("Invalid block file offset: " + block);
}
// We are reading the next block without block type validation, because
// it might turn out to be a non-data block.
curBlock = reader.readBlock(curBlock.getOffset()
+ curBlock.getOnDiskSizeWithHeader(),
curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
block = reader.readBlock(block.getOffset()
+ block.getOnDiskSizeWithHeader(),
block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
isCompaction, true, null, getEffectiveDataBlockEncoding());
} while (!curBlock.getBlockType().isData());
if (block != null && !block.getBlockType().isData()) {
// Whatever block we read we will be returning it unless
// it is a datablock. Just in case the blocks are non data blocks
reader.returnBlock(block);
}
} while (!block.getBlockType().isData());
return curBlock;
return block;
}
public DataBlockEncoding getEffectiveDataBlockEncoding() {
@ -817,13 +889,27 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return null;
KeyValue ret;
// TODO : reduce the varieties of KV here. Check if based on a boolean
// we can handle the 'no tags' case
// TODO : Handle MBB here
if (currTagsLen > 0) {
ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
} else {
ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
}
} else {
ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
if (this.curBlock.getMemoryType() == MemoryType.SHARED) {
ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
} else {
ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
}
}
if (this.reader.shouldIncludeMemstoreTS()) {
ret.setSequenceId(currMemstoreTS);
}
@ -838,6 +924,32 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
+ KEY_VALUE_LEN_SIZE, currKeyLen);
}
private static class ShareableMemoryKeyValue extends SizeCachedKeyValue implements
ShareableMemory {
public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) {
super(bytes, offset, length);
}
@Override
public Cell cloneToCell() {
byte[] copy = Bytes.copy(this.bytes, this.offset, this.length);
return new SizeCachedKeyValue(copy, 0, copy.length);
}
}
private static class ShareableMemoryNoTagsKeyValue extends SizeCachedNoTagsKeyValue implements
ShareableMemory {
public ShareableMemoryNoTagsKeyValue(byte[] bytes, int offset, int length) {
super(bytes, offset, length);
}
@Override
public Cell cloneToCell() {
byte[] copy = Bytes.copy(this.bytes, this.offset, this.length);
return new SizeCachedNoTagsKeyValue(copy, 0, copy.length);
}
}
@Override
public ByteBuffer getValue() {
assertSeeked();
@ -849,7 +961,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
protected void setNonSeekedState() {
block = null;
reset();
blockBuffer = null;
currKeyLen = 0;
currValueLen = 0;
@ -869,7 +981,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
+ "; currKeyLen = " + currKeyLen + "; currValLen = "
+ currValueLen + "; block limit = " + blockBuffer.limit()
+ "; HFile name = " + reader.getName()
+ "; currBlock currBlockOffset = " + block.getOffset());
+ "; currBlock currBlockOffset = " + this.curBlock.getOffset());
throw e;
}
}
@ -882,7 +994,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
private boolean positionForNextBlock() throws IOException {
// Methods are small so they get inlined because they are 'hot'.
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
if (block.getOffset() >= lastDataBlockOffset) {
if (this.curBlock.getOffset() >= lastDataBlockOffset) {
setNonSeekedState();
return false;
}
@ -897,7 +1009,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
setNonSeekedState();
return false;
}
updateCurrBlock(nextBlock);
updateCurrentBlock(nextBlock);
return true;
}
@ -946,27 +1058,37 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return false;
}
long firstDataBlockOffset =
reader.getTrailer().getFirstDataBlockOffset();
if (block != null && block.getOffset() == firstDataBlockOffset) {
blockBuffer.rewind();
readKeyValueLen();
return true;
long firstDataBlockOffset = reader.getTrailer().getFirstDataBlockOffset();
if (curBlock != null
&& curBlock.getOffset() == firstDataBlockOffset) {
return processFirstDataBlock();
}
block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (block.getOffset() < 0) {
throw new IOException("Invalid block offset: " + block.getOffset());
}
updateCurrBlock(block);
readAndUpdateNewBlock(firstDataBlockOffset);
return true;
}
protected boolean processFirstDataBlock() throws IOException{
blockBuffer.rewind();
readKeyValueLen();
return true;
}
protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException,
CorruptHFileException {
HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (newBlock.getOffset() < 0) {
throw new IOException("Invalid block offset: " + newBlock.getOffset());
}
updateCurrentBlock(newBlock);
}
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
boolean rewind, Cell key, boolean seekBefore) throws IOException {
if (block == null || block.getOffset() != seekToBlock.getOffset()) {
updateCurrBlock(seekToBlock);
if (this.curBlock == null
|| this.curBlock.getOffset() != seekToBlock.getOffset()) {
updateCurrentBlock(seekToBlock);
} else if (rewind) {
blockBuffer.rewind();
}
@ -989,10 +1111,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
*/
protected final void checkKeyValueLen() {
if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) {
throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen +
" or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() +
", block length: " + this.blockBuffer.limit() + ", position: " +
this.blockBuffer.position() + " (without header).");
throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen
+ " or currValueLen " + this.currValueLen + ". Block offset: "
+ this.curBlock.getOffset() + ", block length: "
+ this.blockBuffer.limit() + ", position: " + this.blockBuffer.position()
+ " (without header).");
}
}
@ -1002,19 +1125,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
*
* @param newBlock the block to make current
*/
protected void updateCurrBlock(HFileBlock newBlock) {
block = newBlock;
protected void updateCurrentBlock(HFileBlock newBlock) throws IOException {
// Set the active block on the reader
// sanity check
if (block.getBlockType() != BlockType.DATA) {
throw new IllegalStateException("Scanner works only on data " +
"blocks, got " + block.getBlockType() + "; " +
"fileName=" + reader.getName() + ", " +
"dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " +
"isCompaction=" + isCompaction);
if (newBlock.getBlockType() != BlockType.DATA) {
throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got "
+ newBlock.getBlockType() + "; " + "fileName=" + reader.getName()
+ ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction="
+ isCompaction);
}
blockBuffer = block.getBufferWithoutHeader();
updateCurrBlockRef(newBlock);
blockBuffer = newBlock.getBufferWithoutHeader();
readKeyValueLen();
blockFetches++;
@ -1057,14 +1179,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
key, this.keyOnlyKv);
}
@Override
public void close() {
// HBASE-12295 will add code here.
}
@Override
public void shipped() throws IOException {
// HBASE-12295 will add code here.
this.returnBlocks(false);
}
}
@ -1127,8 +1244,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
updateCacheMetrics);
if (cachedBlock != null) {
if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
}
HFileBlock compressedBlock = cachedBlock;
cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
// In case of compressed block after unpacking we can return the compressed block
if (compressedBlock != cachedBlock) {
cache.returnBlock(cacheKey, compressedBlock);
}
}
validateBlockType(cachedBlock, expectedBlockType);
if (expectedDataBlockEncoding == null) {
@ -1163,6 +1285,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
" because of a data block encoding mismatch" +
"; expected: " + expectedDataBlockEncoding +
", actual: " + actualDataBlockEncoding);
// This is an error scenario. so here we need to decrement the
// count.
cache.returnBlock(cacheKey, cachedBlock);
cache.evictBlock(cacheKey);
}
return null;
@ -1180,7 +1305,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* @throws IOException
*/
@Override
public ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock)
public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock)
throws IOException {
if (trailer.getMetaIndexCount() == 0) {
return null; // there are no meta blocks
@ -1213,7 +1338,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block,
// so pos does not get messed by the scanner
return cachedBlock.getBufferWithoutHeader();
return cachedBlock;
}
// Cache Miss, please load.
}
@ -1227,7 +1352,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
}
return metaBlock.getBufferWithoutHeader();
return metaBlock;
}
}
@ -1424,7 +1549,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public boolean isSeeked(){
return this.block != null;
return curBlock != null;
}
public void setNonSeekedState() {
reset();
}
/**
@ -1434,21 +1563,21 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
* @param newBlock the block to make current
* @throws CorruptHFileException
*/
private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
block = newBlock;
@Override
protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
// sanity checks
if (block.getBlockType() != BlockType.ENCODED_DATA) {
throw new IllegalStateException(
"EncodedScanner works only on encoded data blocks");
if (newBlock.getBlockType() != BlockType.ENCODED_DATA) {
throw new IllegalStateException("EncodedScanner works only on encoded data blocks");
}
short dataBlockEncoderId = block.getDataBlockEncodingId();
short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
String encoderCls = dataBlockEncoder.getClass().getName();
throw new CorruptHFileException("Encoder " + encoderCls
+ " doesn't support data block encoding "
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId));
}
updateCurrBlockRef(newBlock);
ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
seeker.setCurrentBuffer(encodedBuffer);
blockFetches++;
@ -1467,29 +1596,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
@Override
public boolean seekTo() throws IOException {
if (reader == null) {
return false;
}
if (reader.getTrailer().getEntryCount() == 0) {
// No data blocks.
return false;
}
long firstDataBlockOffset =
reader.getTrailer().getFirstDataBlockOffset();
if (block != null && block.getOffset() == firstDataBlockOffset) {
seeker.rewind();
return true;
}
block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
if (block.getOffset() < 0) {
throw new IOException("Invalid block offset: " + block.getOffset());
}
updateCurrentBlock(block);
protected boolean processFirstDataBlock() throws IOException {
seeker.rewind();
return true;
}
@ -1497,10 +1605,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
public boolean next() throws IOException {
boolean isValid = seeker.next();
if (!isValid) {
block = readNextDataBlock();
isValid = block != null;
HFileBlock newBlock = readNextDataBlock();
isValid = newBlock != null;
if (isValid) {
updateCurrentBlock(block);
updateCurrentBlock(newBlock);
} else {
setNonSeekedState();
}
}
return isValid;
@ -1520,7 +1630,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
@Override
public Cell getCell() {
if (block == null) {
if (this.curBlock == null) {
return null;
}
return seeker.getCell();
@ -1539,7 +1649,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
private void assertValidSeek() {
if (block == null) {
if (this.curBlock == null) {
throw new NotSeekedException();
}
}
@ -1548,9 +1658,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
return dataBlockEncoder.getFirstKeyCellInBlock(getEncodedBuffer(curBlock));
}
@Override
protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
boolean rewind, Cell key, boolean seekBefore) throws IOException {
if (block == null || block.getOffset() != seekToBlock.getOffset()) {
if (this.curBlock == null
|| this.curBlock.getOffset() != seekToBlock.getOffset()) {
updateCurrentBlock(seekToBlock);
} else if (rewind) {
seeker.rewind();
@ -1631,6 +1743,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
HFileContextBuilder builder = new HFileContextBuilder()
.withIncludesMvcc(shouldIncludeMemstoreTS())
.withHBaseCheckSum(true)
.withHFileName(this.getName())
.withCompression(this.compressAlgo);
// Check for any key material available

View File

@ -34,11 +34,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
@ -49,6 +48,7 @@ import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@ -1090,4 +1090,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
public BlockCache[] getBlockCaches() {
return null;
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
// There is no SHARED type here. Just return
}
}

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.Addressing;
@ -259,7 +260,8 @@ public class MemcachedBlockCache implements BlockCache {
public HFileBlock decode(CachedData d) {
try {
ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true,
MemoryType.EXCLUSIVE);
} catch (IOException e) {
LOG.warn("Error deserializing data from memcached",e);
}
@ -272,4 +274,9 @@ public class MemcachedBlockCache implements BlockCache {
}
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
// Not doing reference counting. All blocks here are EXCLUSIVE
}
}

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -58,16 +59,17 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ConcurrentIndex;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@ -421,20 +423,19 @@ public class BucketCache implements BlockCache, HeapSize {
// TODO : change this area - should be removed after server cells and
// 12295 are available
int len = bucketEntry.getLength();
ByteBuffer buf = ByteBuffer.allocate(len);
int lenRead = ioEngine.read(buf, bucketEntry.offset());
ByteBuff bb = new SingleByteBuff(buf);
if (lenRead != len) {
throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
}
Pair<ByteBuff, MemoryType> pair = ioEngine.read(bucketEntry.offset(), len);
ByteBuff bb = pair.getFirst();
CacheableDeserializer<Cacheable> deserializer =
bucketEntry.deserializerReference(this.deserialiserMap);
Cacheable cachedBlock = deserializer.deserialize(bb, true);
Cacheable cachedBlock = deserializer.deserialize(bb, true, pair.getSecond());
long timeTaken = System.nanoTime() - start;
if (updateCacheMetrics) {
cacheStats.hit(caching);
cacheStats.ioHit(timeTaken);
}
if (pair.getSecond() == MemoryType.SHARED) {
bucketEntry.refCount.incrementAndGet();
}
bucketEntry.access(accessCount.incrementAndGet());
if (this.ioErrorStartTime > 0) {
ioErrorStartTime = -1;
@ -468,14 +469,16 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return evictBlock(cacheKey, true);
}
// does not check for the ref count. Just tries to evict it if found in the
// bucket map
private boolean forceEvict(BlockCacheKey cacheKey) {
if (!cacheEnabled) {
return false;
}
RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) {
this.blockNumber.decrementAndGet();
this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
}
RAMQueueEntry removedBlock = checkRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
if (removedBlock != null) {
@ -505,6 +508,67 @@ public class BucketCache implements BlockCache, HeapSize {
return true;
}
private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) {
this.blockNumber.decrementAndGet();
this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
}
return removedBlock;
}
public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
if (!cacheEnabled) {
return false;
}
RAMQueueEntry removedBlock = checkRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
if (removedBlock != null) {
cacheStats.evicted(0);
return true;
} else {
return false;
}
}
IdLock.Entry lockEntry = null;
try {
lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
int refCount = bucketEntry.refCount.get();
if(refCount == 0) {
if (backingMap.remove(cacheKey, bucketEntry)) {
blockEvicted(cacheKey, bucketEntry, removedBlock == null);
} else {
return false;
}
} else {
if(!deletedBlock) {
if (LOG.isDebugEnabled()) {
LOG.debug("This block " + cacheKey + " is still referred by " + refCount
+ " readers. Can not be freed now");
}
return false;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("This block " + cacheKey + " is still referred by " + refCount
+ " readers. Can not be freed now. Hence will mark this"
+ " for evicting at a later point");
}
bucketEntry.markedForEvict = true;
}
}
} catch (IOException ie) {
LOG.warn("Failed evicting block " + cacheKey);
return false;
} finally {
if (lockEntry != null) {
offsetLock.releaseLockEntry(lockEntry);
}
}
cacheStats.evicted(bucketEntry.getCachedTime());
return true;
}
/*
* Statistics thread. Periodically output cache statistics to the log.
*/
@ -1107,6 +1171,10 @@ public class BucketCache implements BlockCache, HeapSize {
byte deserialiserIndex;
private volatile long accessCounter;
private BlockPriority priority;
// Set this when we were not able to forcefully evict the block
private volatile boolean markedForEvict;
private AtomicInteger refCount = new AtomicInteger(0);
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
*/
@ -1198,9 +1266,12 @@ public class BucketCache implements BlockCache, HeapSize {
public long free(long toFree) {
Map.Entry<BlockCacheKey, BucketEntry> entry;
long freedBytes = 0;
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
while ((entry = queue.pollLast()) != null) {
evictBlock(entry.getKey());
freedBytes += entry.getValue().getLength();
if (evictBlock(entry.getKey(), false)) {
freedBytes += entry.getValue().getLength();
}
if (freedBytes >= toFree) {
return freedBytes;
}
@ -1404,4 +1475,26 @@ public class BucketCache implements BlockCache, HeapSize {
public BlockCache[] getBlockCaches() {
return null;
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
if (block.getMemoryType() == MemoryType.SHARED) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) {
int refCount = bucketEntry.refCount.decrementAndGet();
if (bucketEntry.markedForEvict && refCount == 0) {
forceEvict(cacheKey);
}
}
}
}
@VisibleForTesting
public int getRefCount(BlockCacheKey cacheKey) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null) {
return bucketEntry.refCount.get();
}
return 0;
}
}

View File

@ -22,8 +22,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferArray;
import org.apache.hadoop.hbase.util.Pair;
/**
* IO engine that stores data in memory using an array of ByteBuffers
@ -64,24 +67,24 @@ public class ByteBufferIOEngine implements IOEngine {
return false;
}
/**
* Transfers data from the buffer array to the given byte buffer
* @param dstBuffer the given byte buffer into which bytes are to be written
* @param offset The offset in the ByteBufferArray of the first byte to be
* read
* @return number of bytes read
* @throws IOException
*/
@Override
public int read(ByteBuffer dstBuffer, long offset) throws IOException {
assert dstBuffer.hasArray();
return bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(),
dstBuffer.arrayOffset());
}
@Override
public ByteBuff read(long offset, int len) throws IOException {
return bufferArray.asSubByteBuff(offset, len);
public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException {
// TODO : this allocate and copy will go away once we create BB backed cells
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(),
dstBuffer.arrayOffset());
// Here the buffer that is created directly refers to the buffer in the actual buckets.
// When any cell is referring to the blocks created out of these buckets then it means that
// those cells are referring to a shared memory area which if evicted by the BucketCache would
// lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY
// so that the readers using this block are aware of this fact and do the necessary action
// to prevent eviction till the results are either consumed or copied
if (dstBuffer.limit() != length) {
throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
+ " expected");
}
// TODO : to be removed - make it conditional
return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.SHARED);
}
/**

View File

@ -26,8 +26,10 @@ import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
/**
@ -81,14 +83,25 @@ public class FileIOEngine implements IOEngine {
/**
* Transfers data from file to the given byte buffer
* @param dstBuffer the given byte buffer into which bytes are to be written
* @param offset The offset in the file where the first byte to be read
* @param length The length of buffer that should be allocated for reading
* from the file channel
* @return number of bytes read
* @throws IOException
*/
@Override
public int read(ByteBuffer dstBuffer, long offset) throws IOException {
return fileChannel.read(dstBuffer, offset);
public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException {
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
fileChannel.read(dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
// this buffer from the file the data is already copied and there is no need to ensure that
// the results are not corrupted before consuming them.
if (dstBuffer.limit() != length) {
throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length
+ " expected");
}
return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.EXCLUSIVE);
}
/**
@ -128,14 +141,6 @@ public class FileIOEngine implements IOEngine {
}
}
@Override
public ByteBuff read(long offset, int len) throws IOException {
ByteBuffer dstBuffer = ByteBuffer.allocate(len);
int read = read(dstBuffer, offset);
dstBuffer.limit(read);
return new SingleByteBuff(dstBuffer);
}
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// When caching block into BucketCache there will be single buffer backing for this HFileBlock.

View File

@ -22,7 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Pair;
/**
* A class implementing IOEngine interface supports data services for
@ -36,25 +38,14 @@ public interface IOEngine {
boolean isPersistent();
/**
* Transfers data from IOEngine to the given byte buffer
* @param dstBuffer the given byte buffer into which bytes are to be written
* Transfers data from IOEngine to a byte buffer
* @param length How many bytes to be read from the offset
* @param offset The offset in the IO engine where the first byte to be read
* @return number of bytes read
* @return Pair of ByteBuffer where data is read and its MemoryType ({@link MemoryType})
* @throws IOException
* @throws RuntimeException when the length of the ByteBuff read is less than 'len'
*/
int read(ByteBuffer dstBuffer, long offset) throws IOException;
/**
* Transfers data from IOEngine at the given offset to an MultiByteBuffer
* @param offset the offset from which the underlying buckets should be read
* @param len the length upto which the buckets should be read
* @return the MultiByteBuffer formed from the underlying ByteBuffers forming the
* buckets
* @throws IOException
* @throws RuntimeException when the length of the ByteBuff read is less than 'len'
*/
ByteBuff read(long offset, int len) throws IOException;
Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException;
/**
* Transfers data from the given byte buffer to IOEngine

View File

@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
@ -87,6 +88,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagRewriteCell;
@ -2432,39 +2434,44 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
return getScanner(scan, true);
}
protected RegionScanner getScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
public RegionScanner getScanner(Scan scan, boolean copyCellsFromSharedMem) throws IOException {
RegionScanner scanner = getScanner(scan, null, copyCellsFromSharedMem);
return scanner;
}
protected RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
boolean copyCellsFromSharedMem) throws IOException {
startRegionOperation(Operation.SCAN);
try {
// Verify families are all valid
if (!scan.hasFamilies()) {
// Adding all families to scanner
for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
for (byte[] family : this.htableDescriptor.getFamiliesKeys()) {
scan.addFamily(family);
}
} else {
for (byte [] family : scan.getFamilyMap().keySet()) {
for (byte[] family : scan.getFamilyMap().keySet()) {
checkFamily(family);
}
}
return instantiateRegionScanner(scan, additionalScanners);
return instantiateRegionScanner(scan, additionalScanners, copyCellsFromSharedMem);
} finally {
closeRegionOperation(Operation.SCAN);
}
}
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
List<KeyValueScanner> additionalScanners, boolean copyCellsFromSharedMem) throws IOException {
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
return new ReversedRegionScannerImpl(scan, additionalScanners, this);
return new ReversedRegionScannerImpl(scan, additionalScanners, this, copyCellsFromSharedMem);
}
return new RegionScannerImpl(scan, additionalScanners, this);
return new RegionScannerImpl(scan, additionalScanners, this, copyCellsFromSharedMem);
}
@Override
@ -5210,6 +5217,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected final byte[] stopRow;
protected final HRegion region;
protected final CellComparator comparator;
protected boolean copyCellsFromSharedMem = false;
private final long readPt;
private final long maxResultSize;
@ -5221,7 +5229,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return region.getRegionInfo();
}
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
public void setCopyCellsFromSharedMem(boolean copyCells) {
this.copyCellsFromSharedMem = copyCells;
}
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
boolean copyCellsFromSharedMem)
throws IOException {
this.region = region;
this.maxResultSize = scan.getMaxResultSize();
@ -5231,13 +5244,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.filter = null;
}
this.comparator = region.getCellCompartor();
/**
* By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
* scanner context that can be used to enforce the batch limit in the event that a
* ScannerContext is not specified during an invocation of next/nextRaw
*/
defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
defaultScannerContext = ScannerContext.newBuilder()
.setBatchLimit(scan.getBatch()).build();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
this.stopRow = null;
@ -5279,6 +5292,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
joinedScanners.add(scanner);
}
}
this.copyCellsFromSharedMem = copyCellsFromSharedMem;
initializeKVHeap(scanners, joinedScanners, region);
}
@ -5353,24 +5367,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// scanner is closed
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues;
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
List<Cell> tmpList = new ArrayList<Cell>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
boolean moreValues = false;
try {
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
List<Cell> tmpList = new ArrayList<Cell>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
if (!scannerContext.partialResultFormed()) resetFilters();
// If the size limit was reached it means a partial Result is being
// returned. Returning a
// partial Result means that we should not reset the filters; filters
// should only be reset in
// between rows
if (!scannerContext.partialResultFormed()) resetFilters();
if (isFilterDoneInternal()) {
moreValues = false;
if (isFilterDoneInternal()) {
moreValues = false;
}
// If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise
// it is a call coming from the RsRpcServices.scan().
if (copyCellsFromSharedMem && !outResults.isEmpty()) {
// Do the copy of the results here.
ListIterator<Cell> listItr = outResults.listIterator();
Cell cell = null;
while (listItr.hasNext()) {
cell = listItr.next();
if (cell instanceof ShareableMemory) {
listItr.set(((ShareableMemory) cell).cloneToCell());
}
}
}
} finally {
if (copyCellsFromSharedMem) {
// In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return
// the blocks then and there (for wrapped CPs)
this.shipped();
}
}
return moreValues;
}
@ -6365,6 +6403,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public Result get(final Get get) throws IOException {
prepareGet(get);
List<Cell> results = get(get, true);
boolean stale = this.getRegionInfo().getReplicaId() != 0;
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException {
checkRow(get.getRow(), "Get");
// Verify families are all valid
if (get.hasFamilies()) {
@ -6376,9 +6421,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.addFamily(family);
}
}
List<Cell> results = get(get, true);
boolean stale = this.getRegionInfo().getReplicaId() != 0;
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
@Override
@ -6388,9 +6430,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// pre-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
if (coprocessorHost.preGet(get, results)) {
return results;
}
if (coprocessorHost.preGet(get, results)) {
return results;
}
}
Scan scan = new Scan(get);
@ -6409,16 +6451,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postGet(get, results);
}
// do after lock
metricsUpdateForGet(results);
return results;
}
void metricsUpdateForGet(List<Cell> results) {
if (this.metricsRegion != null) {
long totalSize = 0L;
for (Cell cell : results) {
// This should give an estimate of the cell in the result. Why do we need
// to know the serialization of how the codec works with it??
totalSize += CellUtil.estimatedSerializedSizeOf(cell);
}
this.metricsRegion.updateGet(totalSize);
}
return results;
}
@Override
@ -7179,7 +7226,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// New HBASE-880 Helpers
//
private void checkFamily(final byte [] family)
void checkFamily(final byte [] family)
throws NoSuchColumnFamilyException {
if (!this.htableDescriptor.hasFamily(family)) {
throw new NoSuchColumnFamilyException("Column family " +

View File

@ -47,6 +47,10 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner {
protected PriorityQueue<KeyValueScanner> heap = null;
// Holds the scanners when a ever a eager close() happens. All such eagerly closed
// scans are collected and when the final scanner.close() happens will perform the
// actual close.
protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>();
/**
* The current sub-scanner, i.e. the one that contains the next key/value
@ -62,8 +66,6 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
protected KVScannerComparator comparator;
protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>();
/**
* Constructor. This KeyValueHeap will handle closing of passed in
* KeyValueScanners.
@ -160,6 +162,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
*/
if (pee == null || !moreCells) {
// add the scanner that is to be closed
this.scannersForDelayedClose.add(this.current);
} else {
this.heap.add(this.current);

View File

@ -155,6 +155,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
@ -243,7 +244,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* An Rpc callback for closing a RegionScanner.
*/
private static class RegionScannerCloseCallBack implements RpcCallback {
static class RegionScannerCloseCallBack implements RpcCallback {
private final RegionScanner scanner;
@ -283,6 +284,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
/**
* An RpcCallBack that creates a list of scanners that needs to perform callBack operation on
* completion of multiGets.
*/
static class RegionScannersCloseCallBack implements RpcCallback {
private final List<RegionScanner> scanners = new ArrayList<RegionScanner>();
public void addScanner(RegionScanner scanner) {
this.scanners.add(scanner);
}
@Override
public void run() {
for (RegionScanner scanner : scanners) {
try {
scanner.close();
} catch (IOException e) {
LOG.error("Exception while closing the scanner " + scanner, e);
}
}
}
}
/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
@ -337,7 +361,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preScannerClose(s);
}
s.close();
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(s);
@ -418,8 +441,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return context != null && context.isClientCellBlockSupport();
}
private void addResult(final MutateResponse.Builder builder,
final Result result, final PayloadCarryingRpcController rpcc) {
private void addResult(final MutateResponse.Builder builder, final Result result,
final PayloadCarryingRpcController rpcc) {
if (result == null) return;
if (isClientCellBlockSupport()) {
builder.setResult(ProtobufUtil.toResultNoData(result));
@ -626,13 +649,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<ClientProtos.Action> mutations = null;
for (ClientProtos.Action action: actions.getActionList()) {
RpcCallContext context = RpcServer.getCurrentCall();
// An RpcCallBack that creates a list of scanners that needs to perform callBack
// operation on completion of multiGets.
RegionScannersCloseCallBack closeCallBack = null;
for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
if (action.hasGet()) {
if (closeCallBack == null) {
// Initialize only once
closeCallBack = new RegionScannersCloseCallBack();
// Set the call back here itself.
context.setCallBack(closeCallBack);
}
Get get = ProtobufUtil.toGet(action.getGet());
r = region.get(get);
r = get(get, ((HRegion) region), closeCallBack, context);
} else if (action.hasServiceCall()) {
resultOrExceptionBuilder = ResultOrException.newBuilder();
try {
@ -661,7 +694,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case INCREMENT:
r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup);
break;
case PUT:
case DELETE:
@ -679,7 +712,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
if (r != null) {
ClientProtos.Result pbResult = null;
if (isClientCellBlockSupport()) {
if (isClientCellBlockSupport(context)) {
pbResult = ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
@ -1930,7 +1963,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ClientProtos.Get get = request.getGet();
Boolean existence = null;
Result r = null;
RpcCallContext context = RpcServer.getCurrentCall();
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
Get clientGet = ProtobufUtil.toGet(get);
@ -1938,7 +1971,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
existence = region.getCoprocessorHost().preExists(clientGet);
}
if (existence == null) {
r = region.get(clientGet);
if (context != null) {
r = get(clientGet, ((HRegion) region), null, context);
} else {
// for test purpose
r = region.get(clientGet);
}
if (get.getExistenceOnly()) {
boolean exists = r.getExists();
if (region.getCoprocessorHost() != null) {
@ -1971,6 +2009,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack,
RpcCallContext context) throws IOException {
region.prepareGet(get);
List<Cell> results = new ArrayList<Cell>();
boolean stale = region.getRegionInfo().getReplicaId() != 0;
// pre-get CP hook
if (region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preGet(get, results)) {
return Result
.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
}
Scan scan = new Scan(get);
RegionScanner scanner = null;
try {
scanner = region.getScanner(scan, false);
scanner.next(results);
} finally {
if (scanner != null) {
if (closeCallBack == null) {
// If there is a context then the scanner can be added to the current
// RpcCallContext. The rpc callback will take care of closing the
// scanner, for eg in case
// of get()
assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback;
context.setCallBack((RegionScannerImpl) scanner);
} else {
// The call is from multi() where the results from the get() are
// aggregated and then send out to the
// rpc. The rpccall back will close all such scanners created as part
// of multi().
closeCallBack.addScanner(scanner);
}
}
}
// post-get CP hook
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postGet(get, results);
}
region.metricsUpdateForGet(results);
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
@ -2230,6 +2314,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean moreResults = true;
boolean closeScanner = false;
boolean isSmallScan = false;
RegionScanner actualRegionScanner = null;
ScanResponse.Builder builder = ScanResponse.newBuilder();
if (request.hasCloseScanner()) {
closeScanner = request.getCloseScanner();
@ -2274,17 +2359,27 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
scanner = region.getCoprocessorHost().preScannerOpen(scan);
}
if (scanner == null) {
scanner = region.getScanner(scan);
scanner = ((HRegion)region).getScanner(scan, false);
}
actualRegionScanner = scanner;
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
if (actualRegionScanner != scanner) {
// It means the RegionScanner has been wrapped
if (actualRegionScanner instanceof RegionScannerImpl) {
// Copy the results when nextRaw is called from the CP so that
// CP can have a cloned version of the results without bothering
// about the eviction. Ugly, yes!!!
((RegionScannerImpl) actualRegionScanner).setCopyCellsFromSharedMem(true);
}
}
scannerId = this.scannerIdGen.incrementAndGet();
scannerName = String.valueOf(scannerId);
rsh = addScanner(scannerName, scanner, region);
ttl = this.scannerLeaseTimeoutPeriod;
}
assert scanner != null;
RpcCallContext context = RpcServer.getCurrentCall();
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
@ -2295,9 +2390,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// performed even before checking of Lease.
// See HBASE-5974
if (request.hasNextCallSeq()) {
if (rsh == null) {
rsh = scanners.get(scannerName);
}
if (rsh != null) {
if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
throw new OutOfOrderScannerNextException(
@ -2411,7 +2503,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
while (i < rows) {
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
@ -2488,7 +2579,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} finally {
region.closeRegionOperation();
}
// coprocessor postNext hook
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);

View File

@ -42,9 +42,9 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
* @throws IOException
*/
ReversedRegionScannerImpl(Scan scan,
List<KeyValueScanner> additionalScanners, HRegion region)
List<KeyValueScanner> additionalScanners, HRegion region, boolean copyCellsFromSharedMem)
throws IOException {
region.super(scan, additionalScanners, region);
region.super(scan, additionalScanners, region, copyCellsFromSharedMem);
}
@Override

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -1282,7 +1283,7 @@ public class StoreFile {
// Empty file
if (reader.getTrailer().getEntryCount() == 0)
return false;
HFileBlock bloomBlock = null;
try {
boolean shouldCheckBloom;
ByteBuff bloom;
@ -1290,8 +1291,8 @@ public class StoreFile {
bloom = null;
shouldCheckBloom = true;
} else {
bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
true);
bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true);
bloom = bloomBlock.getBufferWithoutHeader();
shouldCheckBloom = bloom != null;
}
@ -1343,8 +1344,10 @@ public class StoreFile {
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter data -- proceeding without", e);
setGeneralBloomFilterFaulty();
} finally {
// Return the bloom block so that its ref count can be decremented.
reader.returnBlock(bloomBlock);
}
return true;
}

View File

@ -85,6 +85,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final long maxRowSize;
protected final long cellsPerHeartbeatCheck;
// Collects all the KVHeap that are eagerly getting closed during the
// course of a scan
protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
/**
@ -446,8 +448,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private void close(boolean withHeapClose){
lock.lock();
try {
if (this.closing) return;
this.closing = true;
if (this.closing) {
return;
}
if (withHeapClose) this.closing = true;
// under test, we dont have a this.store
if (this.store != null) this.store.deleteChangedReaderObserver(this);
if (withHeapClose) {
@ -509,6 +513,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
// By this time partial close should happened because already heap is null
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}

View File

@ -1406,7 +1406,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
desc.addFamily(hcd);
}
getHBaseAdmin().createTable(desc, startKey, endKey, numRegions);
// HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
// HBaseAdmin only waits for regions to appear in hbase:meta we
// should wait until they are assigned
waitUntilAllRegionsAssigned(tableName);
return (HTable) getConnection().getTable(tableName);
}
@ -1444,8 +1445,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
htd.addFamily(hcd);
}
getHBaseAdmin().createTable(htd, splitKeys);
// HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
// assigned
// HBaseAdmin only waits for regions to appear in hbase:meta
// we should wait until they are assigned
waitUntilAllRegionsAssigned(htd.getTableName());
return (HTable) getConnection().getTable(htd.getTableName());
}
@ -1460,7 +1461,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public HTable createTable(HTableDescriptor htd, byte[][] splitRows)
throws IOException {
getHBaseAdmin().createTable(htd, splitRows);
// HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
// HBaseAdmin only waits for regions to appear in hbase:meta
// we should wait until they are assigned
waitUntilAllRegionsAssigned(htd.getTableName());
return (HTable) getConnection().getTable(htd.getTableName());
}
@ -1700,6 +1702,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return (HTable) getConnection().getTable(tableName);
}
public HTable createTable(TableName tableName, byte[][] families,
int numVersions, int blockSize, String cpName) throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family)
.setMaxVersions(numVersions)
.setBlocksize(blockSize);
desc.addFamily(hcd);
}
if(cpName != null) {
desc.addCoprocessor(cpName);
}
getHBaseAdmin().createTable(desc);
// HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
waitUntilAllRegionsAssigned(tableName);
return (HTable) getConnection().getTable(tableName);
}
/**
* Create a table.
* @param tableName

View File

@ -270,7 +270,7 @@ public class CacheTestUtils {
}
@Override
public Cacheable deserialize(ByteBuff b, boolean reuse)
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType)
throws IOException {
return deserialize(b);
}
@ -315,6 +315,11 @@ public class CacheTestUtils {
public BlockType getBlockType() {
return BlockType.DATA;
}
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
@ -73,7 +74,7 @@ public class TestCacheConfig {
}
@Override
public Cacheable deserialize(ByteBuff b, boolean reuse) throws IOException {
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException {
LOG.info("Deserialized " + b + ", reuse=" + reuse);
return cacheable;
}
@ -140,6 +141,11 @@ public class TestCacheConfig {
public BlockType getBlockType() {
return BlockType.DATA;
}
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
};
static class MetaCacheEntry extends DataCacheEntry {

View File

@ -28,7 +28,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -290,6 +293,8 @@ public class TestCacheOnWrite {
DataBlockEncoding encodingInCache =
encoderType.getEncoder().getDataBlockEncoding();
List<Long> cachedBlocksOffset = new ArrayList<Long>();
Map<Long, HFileBlock> cachedBlocks = new HashMap<Long, HFileBlock>();
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
long onDiskSize = -1;
if (prevBlock != null) {
@ -303,6 +308,8 @@ public class TestCacheOnWrite {
offset);
HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
boolean isCached = fromCache != null;
cachedBlocksOffset.add(offset);
cachedBlocks.put(offset, fromCache);
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
"isCached: " + isCached + "\n" +
@ -355,6 +362,28 @@ public class TestCacheOnWrite {
while (scanner.next()) {
scanner.getCell();
}
Iterator<Long> iterator = cachedBlocksOffset.iterator();
while(iterator.hasNext()) {
Long entry = iterator.next();
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
entry);
HFileBlock hFileBlock = cachedBlocks.get(entry);
if (hFileBlock != null) {
// call return twice because for the isCache cased the counter would have got incremented
// twice
blockCache.returnBlock(blockCacheKey, hFileBlock);
if(cacheCompressedData) {
if (this.compress == Compression.Algorithm.NONE
|| cowType == CacheOnWriteType.INDEX_BLOCKS
|| cowType == CacheOnWriteType.BLOOM_BLOCKS) {
blockCache.returnBlock(blockCacheKey, hFileBlock);
}
} else {
blockCache.returnBlock(blockCacheKey, hFileBlock);
}
}
}
scanner.shipped();
reader.close();
}

View File

@ -141,6 +141,11 @@ public class TestCachedBlockQueue extends TestCase {
return BlockType.DATA;
}
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
}, accessTime, false);
}
}

View File

@ -328,7 +328,7 @@ public class TestHFile extends HBaseTestCase {
private void readNumMetablocks(Reader reader, int n) throws IOException {
for (int i = 0; i < n; i++) {
ByteBuff actual = reader.getMetaBlock("HFileMeta" + i, false);
ByteBuff actual = reader.getMetaBlock("HFileMeta" + i, false).getBufferWithoutHeader();
ByteBuffer expected =
ByteBuffer.wrap(("something to test" + i).getBytes());
assertEquals(

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
@ -463,8 +464,9 @@ public class TestHFileBlock {
for (boolean reuseBuffer : new boolean[] { false, true }) {
ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
blockFromHFile.serialize(serialized);
HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer().deserialize(
new SingleByteBuff(serialized), reuseBuffer);
HFileBlock deserialized =
(HFileBlock) blockFromHFile.getDeserializer().deserialize(
new SingleByteBuff(serialized), reuseBuffer, MemoryType.EXCLUSIVE);
assertEquals(
"Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
blockFromHFile, deserialized);

View File

@ -168,6 +168,10 @@ public class TestHFileBlockIndex {
this.realReader = realReader;
}
@Override
public void returnBlock(HFileBlock block) {
}
@Override
public HFileBlock readBlock(long offset, long onDiskSize,
boolean cacheBlock, boolean pread, boolean isCompaction,

View File

@ -779,6 +779,11 @@ public class TestLruBlockCache {
return BlockType.DATA;
}
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
}
}

View File

@ -22,10 +22,12 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -65,11 +67,10 @@ public class TestByteBufferIOEngine {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
ioEngine.write(srcBuffer, offset);
ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize);
ioEngine.read(dstBuffer, offset);
byte[] byteArray2 = dstBuffer.array();
Pair<ByteBuff, MemoryType> pair = ioEngine.read(offset, blockSize);
ByteBuff dstBuffer = pair.getFirst();
for (int j = 0; j < byteArray.length; ++j) {
assertTrue(byteArray[j] == byteArray2[j]);
assertTrue(byteArray[j] == dstBuffer.get(j));
}
}
assert testOffsetAtStartNum == 0;
@ -110,9 +111,9 @@ public class TestByteBufferIOEngine {
//ioEngine.read(dstBuffer, offset);
//MultiByteBuffer read = new MultiByteBuffer(dstBuffer);
// TODO : this will get changed after HBASE-12295 goes in
ByteBuff read = ioEngine.read(offset, blockSize);
Pair<ByteBuff, MemoryType> read = ioEngine.read(offset, blockSize);
for (int j = 0; j < byteArray.length; ++j) {
assertTrue(srcBuffer.get(j) == read.get(j));
assertTrue(srcBuffer.get(j) == read.getFirst().get(j));
}
}
assert testOffsetAtStartNum == 0;

View File

@ -24,8 +24,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -47,9 +50,9 @@ public class TestFileIOEngine {
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
}
byte[] data2 = new byte[len];
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
fileIOEngine.read(ByteBuffer.wrap(data2), offset);
Pair<ByteBuff, MemoryType> pair = fileIOEngine.read(offset, len);
byte[] data2 = pair.getFirst().array();
for (int j = 0; j < data1.length; ++j) {
assertTrue(data1[j] == data2[j]);
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
@ -578,9 +579,13 @@ public class TestHeapMemoryManager {
return null;
}
public void setTestBlockSize(long testBlockSize) {
this.testBlockSize = testBlockSize;
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable buf) {
}
public void setTestBlockSize(long testBlockSize) {
this.testBlockSize = testBlockSize;
}
}
private static class MemstoreFlusherStub implements FlushRequester {

View File

@ -425,14 +425,14 @@ public class TestScannerHeartbeatMessages {
// Instantiate the custom heartbeat region scanners
@Override
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
List<KeyValueScanner> additionalScanners, boolean copyCells) throws IOException {
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
return new HeartbeatReversedRegionScanner(scan, additionalScanners, this, copyCells);
}
return new HeartbeatRegionScanner(scan, additionalScanners, this);
return new HeartbeatRegionScanner(scan, additionalScanners, this, copyCells);
}
}
@ -442,8 +442,8 @@ public class TestScannerHeartbeatMessages {
*/
private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
HRegion region) throws IOException {
super(scan, additionalScanners, region);
HRegion region, boolean copyCells) throws IOException {
super(scan, additionalScanners, region, copyCells);
}
@Override
@ -469,9 +469,9 @@ public class TestScannerHeartbeatMessages {
* column family cells
*/
private static class HeartbeatRegionScanner extends RegionScannerImpl {
HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
region.super(scan, additionalScanners, region);
HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
boolean copyCells) throws IOException {
region.super(scan, additionalScanners, region, copyCells);
}
@Override