HBASE-15366 Add doc, trace-level logging, and test around hfileblock
M hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java Make it emit its toString in format that matches the way we log elsewhere M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Capitalize statics. M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Verify and cleanup documentation of hfileblock format at head of class. Explain what 'EXTRA_SERIALIZATION_SPACE' is all about. Connect how we serialize and deserialize... done in different places and one way when pulling from HDFS and another when pulling from cache (TO BE FIXED). Shut down a load of public access. M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java Add trace-level logging M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java Make it Closeable
This commit is contained in:
parent
4b3e38705c
commit
8ace5bbfce
|
@ -65,7 +65,9 @@ public final class HConstants {
|
|||
public static final byte[] RPC_HEADER = new byte[] { 'H', 'B', 'a', 's' };
|
||||
public static final byte RPC_CURRENT_VERSION = 0;
|
||||
|
||||
// HFileBlock constants.
|
||||
// HFileBlock constants. TODO!!!! THESE DEFINES BELONG IN HFILEBLOCK, NOT UP HERE.
|
||||
// Needed down in hbase-common though by encoders but these encoders should not be dealing
|
||||
// in the internals of hfileblocks. Fix encapsulation.
|
||||
|
||||
/** The size data structures with minor version is 0 */
|
||||
public static final int HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
|
||||
|
|
|
@ -23,12 +23,12 @@ import java.io.OutputStream;
|
|||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.ByteBufferedCell;
|
||||
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.SettableSequenceId;
|
||||
|
@ -52,7 +52,9 @@ import org.apache.hadoop.io.WritableUtils;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
||||
|
||||
/**
|
||||
* TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
|
||||
*/
|
||||
private static int INITIAL_KEY_BUFFER_SIZE = 512;
|
||||
|
||||
@Override
|
||||
|
@ -1140,8 +1142,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
|
||||
.getEncodingState();
|
||||
// Write the unencodedDataSizeWritten (with header size)
|
||||
Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
|
||||
+ DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
|
||||
Bytes.putInt(uncompressedBytesWithHeader,
|
||||
HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
|
||||
);
|
||||
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
|
||||
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
|
||||
|
|
|
@ -39,7 +39,8 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface DataBlockEncoder {
|
||||
|
||||
// TODO: This Interface should be deprecated and replaced. It presumes hfile and carnal knowledge of
|
||||
// Cell internals. It was done for a different time. Remove. Purge.
|
||||
/**
|
||||
* Starts encoding for a block of KeyValues. Call
|
||||
* {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
|
||||
|
|
|
@ -218,22 +218,22 @@ public class HFileContext implements HeapSize, Cloneable {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("HFileContext [");
|
||||
sb.append(" usesHBaseChecksum="); sb.append(usesHBaseChecksum);
|
||||
sb.append(" checksumType="); sb.append(checksumType);
|
||||
sb.append(" bytesPerChecksum="); sb.append(bytesPerChecksum);
|
||||
sb.append(" blocksize="); sb.append(blocksize);
|
||||
sb.append(" encoding="); sb.append(encoding);
|
||||
sb.append(" includesMvcc="); sb.append(includesMvcc);
|
||||
sb.append(" includesTags="); sb.append(includesTags);
|
||||
sb.append(" compressAlgo="); sb.append(compressAlgo);
|
||||
sb.append(" compressTags="); sb.append(compressTags);
|
||||
sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]");
|
||||
sb.append("[");
|
||||
sb.append("usesHBaseChecksum="); sb.append(usesHBaseChecksum);
|
||||
sb.append(", checksumType="); sb.append(checksumType);
|
||||
sb.append(", bytesPerChecksum="); sb.append(bytesPerChecksum);
|
||||
sb.append(", blocksize="); sb.append(blocksize);
|
||||
sb.append(", encoding="); sb.append(encoding);
|
||||
sb.append(", includesMvcc="); sb.append(includesMvcc);
|
||||
sb.append(", includesTags="); sb.append(includesTags);
|
||||
sb.append(", compressAlgo="); sb.append(compressAlgo);
|
||||
sb.append(", compressTags="); sb.append(compressTags);
|
||||
sb.append(", cryptoContext=["); sb.append(cryptoContext); sb.append("]");
|
||||
if (hfileName != null) {
|
||||
sb.append(" name=");
|
||||
sb.append(", name=");
|
||||
sb.append(hfileName);
|
||||
}
|
||||
sb.append(" ]");
|
||||
sb.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -82,8 +82,14 @@ public class CacheConfig {
|
|||
*/
|
||||
|
||||
/**
|
||||
* If the chosen ioengine can persist its state across restarts, the path to the file to
|
||||
* persist to.
|
||||
* If the chosen ioengine can persist its state across restarts, the path to the file to persist
|
||||
* to. This file is NOT the data file. It is a file into which we will serialize the map of
|
||||
* what is in the data file. For example, if you pass the following argument as
|
||||
* BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
|
||||
* <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
|
||||
* <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
|
||||
* is an in-memory map that needs to be persisted across restarts. Where to store this
|
||||
* in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
|
||||
*/
|
||||
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
|
||||
"hbase.bucketcache.persistent.path";
|
||||
|
|
|
@ -178,19 +178,20 @@ public class HFile {
|
|||
* The number of bytes per checksum.
|
||||
*/
|
||||
public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
|
||||
// For measuring number of checksum failures
|
||||
static final Counter checksumFailures = new Counter();
|
||||
|
||||
// for test purpose
|
||||
public static final Counter dataBlockReadCnt = new Counter();
|
||||
// For measuring number of checksum failures
|
||||
static final Counter CHECKSUM_FAILURES = new Counter();
|
||||
|
||||
// For tests. Gets incremented when we read a block whether from HDFS or from Cache.
|
||||
public static final Counter DATABLOCK_READ_COUNT = new Counter();
|
||||
|
||||
/**
|
||||
* Number of checksum verification failures. It also
|
||||
* clears the counter.
|
||||
*/
|
||||
public static final long getChecksumFailuresCount() {
|
||||
long count = checksumFailures.get();
|
||||
checksumFailures.set(0);
|
||||
long count = CHECKSUM_FAILURES.get();
|
||||
CHECKSUM_FAILURES.set(0);
|
||||
return count;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -54,87 +56,121 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
|
||||
* <ul>
|
||||
* <li>In version 1 all blocks are always compressed or uncompressed, as
|
||||
* Reads {@link HFile} version 1 and version 2 blocks but writes version 2 blocks only.
|
||||
* Version 2 was introduced in hbase-0.92.0. Does read and write out to the filesystem but also
|
||||
* the read and write to Cache.
|
||||
*
|
||||
* <h3>HFileBlock: Version 1</h3>
|
||||
* As of this writing, there should be no more version 1 blocks found out in the wild. Version 2
|
||||
* as introduced in hbase-0.92.0.
|
||||
* In version 1 all blocks are always compressed or uncompressed, as
|
||||
* specified by the {@link HFile}'s compression algorithm, with a type-specific
|
||||
* magic record stored in the beginning of the compressed data (i.e. one needs
|
||||
* to uncompress the compressed block to determine the block type). There is
|
||||
* only a single compression algorithm setting for all blocks. Offset and size
|
||||
* information from the block index are required to read a block.
|
||||
* <li>In version 2 a block is structured as follows:
|
||||
* <h3>HFileBlock: Version 2</h3>
|
||||
* In version 2, a block is structured as follows:
|
||||
* <ul>
|
||||
* <li>header (see Writer#finishBlock())
|
||||
* <li><b>Header:</b> See Writer#putHeader(); header total size is HFILEBLOCK_HEADER_SIZE)
|
||||
* <ul>
|
||||
* <li>Magic record identifying the block type (8 bytes)
|
||||
* <li>Compressed block size, excluding header, including checksum (4 bytes)
|
||||
* <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
|
||||
* <li>Magic record identifying the {@link BlockType} (8 bytes): e.g. <code>DATABLK*</code>
|
||||
* <li>Compressed -- a.k.a 'on disk' -- block size, excluding header, but including
|
||||
* tailing checksum bytes (4 bytes)
|
||||
* <li>Uncompressed block size, excluding header, and excluding checksum bytes (4 bytes)
|
||||
* <li>The offset of the previous block of the same type (8 bytes). This is
|
||||
* used to be able to navigate to the previous block without going to the block
|
||||
* used to navigate to the previous block without having to go to the block index
|
||||
* <li>For minorVersions >=1, the ordinal describing checksum type (1 byte)
|
||||
* <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
|
||||
* <li>For minorVersions >=1, the size of data on disk, including header,
|
||||
* <li>For minorVersions >=1, the size of data 'on disk', including header,
|
||||
* excluding checksums (4 bytes)
|
||||
* </ul>
|
||||
* </li>
|
||||
* <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
|
||||
* <li><b>Raw/Compressed/Encrypted/Encoded data:</b> The compression algorithm is the
|
||||
* same for all the blocks in the {@link HFile}, similarly to what was done in
|
||||
* version 1.
|
||||
* <li>For minorVersions >=1, a series of 4 byte checksums, one each for
|
||||
* version 1. If compression is NONE, this is just raw, serialized Cells.
|
||||
* <li><b>Tail:</b> For minorVersions >=1, a series of 4 byte checksums, one each for
|
||||
* the number of bytes specified by bytesPerChecksum.
|
||||
* </ul>
|
||||
* </ul>
|
||||
* <p>Be aware that when we read from HDFS, we overread pulling in the next blocks' header too.
|
||||
* We do this to save having to do two seeks to read an HFileBlock; a seek to read the header
|
||||
* to figure lengths, etc., and then another seek to pull in the data.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HFileBlock implements Cacheable {
|
||||
private static final Log LOG = LogFactory.getLog(HFileBlock.class);
|
||||
|
||||
/**
|
||||
* On a checksum failure on a Reader, these many suceeding read
|
||||
* requests switch back to using hdfs checksums before auto-reenabling
|
||||
* hbase checksum verification.
|
||||
* On a checksum failure, do these many succeeding read requests using hdfs checksums before
|
||||
* auto-reenabling hbase checksum verification.
|
||||
*/
|
||||
static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
|
||||
|
||||
private static int UNSET = -1;
|
||||
public static final boolean FILL_HEADER = true;
|
||||
public static final boolean DONT_FILL_HEADER = false;
|
||||
|
||||
/**
|
||||
* The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
|
||||
* This extends normal header by adding the id of encoder.
|
||||
*/
|
||||
public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
|
||||
+ DataBlockEncoding.ID_SIZE;
|
||||
|
||||
static final byte[] DUMMY_HEADER_NO_CHECKSUM =
|
||||
new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
|
||||
|
||||
// How to get the estimate correctly? if it is a singleBB?
|
||||
public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
|
||||
(int)ClassSize.estimateBase(MultiByteBuff.class, false);
|
||||
|
||||
// meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
|
||||
public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
|
||||
+ Bytes.SIZEOF_LONG;
|
||||
/**
|
||||
* See #blockDeserializer method for more info.
|
||||
* 13 bytes of extra stuff stuck on the end of the HFileBlock that we pull in from HDFS (note,
|
||||
* when we read from HDFS, we pull in an HFileBlock AND the header of the next block if one).
|
||||
* The 13 bytes are: usesHBaseChecksum (1 byte) + offset of this block (long) +
|
||||
* nextBlockOnDiskSizeWithHeader (int).
|
||||
*/
|
||||
public static final int EXTRA_SERIALIZATION_SPACE =
|
||||
Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
|
||||
|
||||
/**
|
||||
* Each checksum value is an integer that can be stored in 4 bytes.
|
||||
*/
|
||||
static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
|
||||
|
||||
static final byte[] DUMMY_HEADER_NO_CHECKSUM =
|
||||
new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
|
||||
|
||||
/**
|
||||
* Used deserializing blocks from Cache.
|
||||
*
|
||||
* Serializing to cache is a little hard to follow. See Writer#finishBlock for where it is done.
|
||||
* When we start to append to a new HFileBlock,
|
||||
* we skip over where the header should go before we start adding Cells. When the block is
|
||||
* done, we'll then go back and fill in the header and the checksum tail. Be aware that what
|
||||
* gets serialized into the blockcache is a byte array that contains an HFileBlock followed by
|
||||
* its checksums and then the header of the next HFileBlock (needed to help navigate), followed
|
||||
* again by an extra 13 bytes of meta info needed when time to recreate the HFileBlock from cache.
|
||||
*
|
||||
* ++++++++++++++
|
||||
* + HFileBlock +
|
||||
* ++++++++++++++
|
||||
* + Checksums +
|
||||
* ++++++++++++++
|
||||
* + NextHeader +
|
||||
* ++++++++++++++
|
||||
* + ExtraMeta! +
|
||||
* ++++++++++++++
|
||||
*
|
||||
* TODO: Fix it so we do NOT put the NextHeader into blockcache. It is not necessary.
|
||||
*/
|
||||
static final CacheableDeserializer<Cacheable> blockDeserializer =
|
||||
new CacheableDeserializer<Cacheable>() {
|
||||
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
// Rewind to just before the EXTRA_SERIALIZATION_SPACE.
|
||||
buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
|
||||
// Get a new buffer to pass the deserialized HFileBlock for it to 'own'.
|
||||
ByteBuff newByteBuffer;
|
||||
if (reuse) {
|
||||
newByteBuffer = buf.slice();
|
||||
} else {
|
||||
// Used only in tests
|
||||
int len = buf.limit();
|
||||
newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len));
|
||||
newByteBuffer.put(0, buf, buf.position(), len);
|
||||
}
|
||||
// Read out the EXTRA_SERIALIZATION_SPACE content and shove into our HFileBlock.
|
||||
buf.position(buf.limit());
|
||||
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
|
||||
boolean usesChecksum = buf.get() == (byte)1;
|
||||
|
@ -158,6 +194,7 @@ public class HFileBlock implements Cacheable {
|
|||
return deserialize(b, false, MemoryType.EXCLUSIVE);
|
||||
}
|
||||
};
|
||||
|
||||
private static final int deserializerIdentifier;
|
||||
static {
|
||||
deserializerIdentifier = CacheableDeserializerIdManager
|
||||
|
@ -167,18 +204,28 @@ public class HFileBlock implements Cacheable {
|
|||
/** Type of block. Header field 0. */
|
||||
private BlockType blockType;
|
||||
|
||||
/** Size on disk excluding header, including checksum. Header field 1. */
|
||||
/**
|
||||
* Size on disk excluding header, including checksum. Header field 1.
|
||||
* @see Writer#putHeader(byte[], int, int, int, int)
|
||||
*/
|
||||
private int onDiskSizeWithoutHeader;
|
||||
|
||||
/** Size of pure data. Does not include header or checksums. Header field 2. */
|
||||
/**
|
||||
* Size of pure data. Does not include header or checksums. Header field 2.
|
||||
* @see Writer#putHeader(byte[], int, int, int, int)
|
||||
*/
|
||||
private final int uncompressedSizeWithoutHeader;
|
||||
|
||||
/** The offset of the previous block on disk. Header field 3. */
|
||||
/**
|
||||
* The offset of the previous block on disk. Header field 3.
|
||||
* @see Writer#putHeader(byte[], int, int, int, int)
|
||||
*/
|
||||
private final long prevBlockOffset;
|
||||
|
||||
/**
|
||||
* Size on disk of header + data. Excludes checksum. Header field 6,
|
||||
* OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
|
||||
* @see Writer#putHeader(byte[], int, int, int, int)
|
||||
*/
|
||||
private final int onDiskDataSizeWithHeader;
|
||||
|
||||
|
@ -192,20 +239,20 @@ public class HFileBlock implements Cacheable {
|
|||
* The offset of this block in the file. Populated by the reader for
|
||||
* convenience of access. This offset is not part of the block header.
|
||||
*/
|
||||
private long offset = -1;
|
||||
private long offset = UNSET;
|
||||
|
||||
/**
|
||||
* The on-disk size of the next block, including the header, obtained by
|
||||
* peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's
|
||||
* header, or -1 if unknown.
|
||||
*/
|
||||
private int nextBlockOnDiskSizeWithHeader = -1;
|
||||
private int nextBlockOnDiskSizeWithHeader = UNSET;
|
||||
|
||||
private MemoryType memType = MemoryType.EXCLUSIVE;
|
||||
|
||||
/**
|
||||
* Creates a new {@link HFile} block from the given fields. This constructor
|
||||
* is mostly used when the block data has already been read and uncompressed,
|
||||
* is used when the block data has already been read and uncompressed,
|
||||
* and is sitting in a byte buffer.
|
||||
*
|
||||
* @param blockType the type of this block, see {@link BlockType}
|
||||
|
@ -213,8 +260,8 @@ public class HFileBlock implements Cacheable {
|
|||
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
|
||||
* @param prevBlockOffset see {@link #prevBlockOffset}
|
||||
* @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
|
||||
* uncompressed data. This
|
||||
* @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
|
||||
* uncompressed data.
|
||||
* @param fillHeader when true, write the first 4 header fields into passed buffer.
|
||||
* @param offset the file offset the block was read from
|
||||
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
|
||||
* @param fileContext HFile meta data
|
||||
|
@ -230,8 +277,9 @@ public class HFileBlock implements Cacheable {
|
|||
this.offset = offset;
|
||||
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
|
||||
this.fileContext = fileContext;
|
||||
if (fillHeader)
|
||||
if (fillHeader) {
|
||||
overwriteHeader();
|
||||
}
|
||||
this.buf.rewind();
|
||||
}
|
||||
|
||||
|
@ -292,8 +340,8 @@ public class HFileBlock implements Cacheable {
|
|||
} else {
|
||||
contextBuilder.withChecksumType(ChecksumType.NULL);
|
||||
contextBuilder.withBytesPerCheckSum(0);
|
||||
this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader +
|
||||
HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
||||
this.onDiskDataSizeWithHeader =
|
||||
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
||||
}
|
||||
this.fileContext = contextBuilder.build();
|
||||
this.memType = memType;
|
||||
|
@ -324,14 +372,14 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* @return the on-disk size of the data part + checksum (header excluded).
|
||||
*/
|
||||
public int getOnDiskSizeWithoutHeader() {
|
||||
int getOnDiskSizeWithoutHeader() {
|
||||
return onDiskSizeWithoutHeader;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the uncompressed size of data part (header and checksum excluded).
|
||||
*/
|
||||
public int getUncompressedSizeWithoutHeader() {
|
||||
int getUncompressedSizeWithoutHeader() {
|
||||
return uncompressedSizeWithoutHeader;
|
||||
}
|
||||
|
||||
|
@ -339,7 +387,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @return the offset of the previous block of the same type in the file, or
|
||||
* -1 if unknown
|
||||
*/
|
||||
public long getPrevBlockOffset() {
|
||||
long getPrevBlockOffset() {
|
||||
return prevBlockOffset;
|
||||
}
|
||||
|
||||
|
@ -381,7 +429,7 @@ public class HFileBlock implements Cacheable {
|
|||
*
|
||||
* @return the buffer of this block for read-only operations
|
||||
*/
|
||||
public ByteBuff getBufferReadOnly() {
|
||||
ByteBuff getBufferReadOnly() {
|
||||
ByteBuff dup = this.buf.duplicate();
|
||||
dup.limit(buf.limit() - totalChecksumBytes());
|
||||
return dup.slice();
|
||||
|
@ -473,20 +521,20 @@ public class HFileBlock implements Cacheable {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder()
|
||||
.append("HFileBlock [")
|
||||
.append(" fileOffset=").append(offset)
|
||||
.append(" headerSize()=").append(headerSize())
|
||||
.append(" blockType=").append(blockType)
|
||||
.append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
|
||||
.append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
|
||||
.append(" prevBlockOffset=").append(prevBlockOffset)
|
||||
.append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
|
||||
.append("[")
|
||||
.append("blockType=").append(blockType)
|
||||
.append(", fileOffset=").append(offset)
|
||||
.append(", headerSize=").append(headerSize())
|
||||
.append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
|
||||
.append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
|
||||
.append(", prevBlockOffset=").append(prevBlockOffset)
|
||||
.append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum());
|
||||
if (fileContext.isUseHBaseChecksum()) {
|
||||
sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
|
||||
.append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
|
||||
.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
|
||||
sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
|
||||
.append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1))
|
||||
.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
|
||||
} else {
|
||||
sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
|
||||
sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
|
||||
.append("(").append(onDiskSizeWithoutHeader)
|
||||
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
|
||||
}
|
||||
|
@ -501,13 +549,13 @@ public class HFileBlock implements Cacheable {
|
|||
bufWithoutHeader.get(dataBeginBytes);
|
||||
dataBegin = Bytes.toStringBinary(dataBeginBytes);
|
||||
}
|
||||
sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
|
||||
.append(" totalChecksumBytes()=").append(totalChecksumBytes())
|
||||
.append(" isUnpacked()=").append(isUnpacked())
|
||||
.append(" buf=[ ").append(buf).append(" ]")
|
||||
.append(" dataBeginsWith=").append(dataBegin)
|
||||
.append(" fileContext=").append(fileContext)
|
||||
.append(" ]");
|
||||
sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader())
|
||||
.append(", totalChecksumBytes=").append(totalChecksumBytes())
|
||||
.append(", isUnpacked=").append(isUnpacked())
|
||||
.append(", buf=[").append(buf).append("]")
|
||||
.append(", dataBeginsWith=").append(dataBegin)
|
||||
.append(", fileContext=").append(fileContext)
|
||||
.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
@ -632,19 +680,8 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param expectedType the expected type of this block
|
||||
* @throws IOException if this block's type is different than expected
|
||||
*/
|
||||
public void expectType(BlockType expectedType) throws IOException {
|
||||
if (blockType != expectedType) {
|
||||
throw new IOException("Invalid block type: expected=" + expectedType
|
||||
+ ", actual=" + blockType);
|
||||
}
|
||||
}
|
||||
|
||||
/** @return the offset of this block in the file it was read from */
|
||||
public long getOffset() {
|
||||
long getOffset() {
|
||||
if (offset < 0) {
|
||||
throw new IllegalStateException(
|
||||
"HFile block offset not initialized properly");
|
||||
|
@ -655,7 +692,7 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* @return a byte stream reading the data + checksum of this block
|
||||
*/
|
||||
public DataInputStream getByteStream() {
|
||||
DataInputStream getByteStream() {
|
||||
ByteBuff dup = this.buf.duplicate();
|
||||
dup.position(this.headerSize());
|
||||
return new DataInputStream(new ByteBuffInputStream(dup));
|
||||
|
@ -685,21 +722,20 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Read from an input stream. Analogous to
|
||||
* Read from an input stream at least <code>necessaryLen</code> and if possible,
|
||||
* <code>extraLen</code> also if available. Analogous to
|
||||
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
|
||||
* number of "extra" bytes that would be desirable but not absolutely
|
||||
* necessary to read.
|
||||
* number of "extra" bytes to also optionally read.
|
||||
*
|
||||
* @param in the input stream to read from
|
||||
* @param buf the buffer to read into
|
||||
* @param bufOffset the destination offset in the buffer
|
||||
* @param necessaryLen the number of bytes that are absolutely necessary to
|
||||
* read
|
||||
* @param necessaryLen the number of bytes that are absolutely necessary to read
|
||||
* @param extraLen the number of extra bytes that would be nice to read
|
||||
* @return true if succeeded reading the extra bytes
|
||||
* @throws IOException if failed to read the necessary bytes
|
||||
*/
|
||||
public static boolean readWithExtra(InputStream in, byte[] buf,
|
||||
static boolean readWithExtra(InputStream in, byte[] buf,
|
||||
int bufOffset, int necessaryLen, int extraLen) throws IOException {
|
||||
int bytesRemaining = necessaryLen + extraLen;
|
||||
while (bytesRemaining > 0) {
|
||||
|
@ -723,7 +759,8 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Read from an input stream. Analogous to
|
||||
* Read from an input stream at least <code>necessaryLen</code> and if possible,
|
||||
* <code>extraLen</code> also if available. Analogous to
|
||||
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
|
||||
* positional read and specifies a number of "extra" bytes that would be
|
||||
* desirable but not absolutely necessary to read.
|
||||
|
@ -776,14 +813,13 @@ public class HFileBlock implements Cacheable {
|
|||
* <li>Construct an {@link HFileBlock.Writer}, providing a compression algorithm.
|
||||
* <li>Call {@link Writer#startWriting} and get a data stream to write to.
|
||||
* <li>Write your data into the stream.
|
||||
* <li>Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to.
|
||||
* <li>Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to.
|
||||
* store the serialized block into an external stream.
|
||||
* <li>Repeat to write more blocks.
|
||||
* </ol>
|
||||
* <p>
|
||||
*/
|
||||
public static class Writer {
|
||||
|
||||
static class Writer {
|
||||
private enum State {
|
||||
INIT,
|
||||
WRITING,
|
||||
|
@ -798,7 +834,7 @@ public class HFileBlock implements Cacheable {
|
|||
|
||||
private HFileBlockEncodingContext dataBlockEncodingCtx;
|
||||
|
||||
/** block encoding context for non-data blocks */
|
||||
/** block encoding context for non-data blocks*/
|
||||
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
|
||||
|
||||
/**
|
||||
|
@ -871,25 +907,26 @@ public class HFileBlock implements Cacheable {
|
|||
* @param dataBlockEncoder data block encoding algorithm to use
|
||||
*/
|
||||
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
|
||||
this.dataBlockEncoder = dataBlockEncoder != null
|
||||
? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
|
||||
defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
dataBlockEncodingCtx = this.dataBlockEncoder
|
||||
.newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
|
||||
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
|
||||
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
|
||||
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
|
||||
fileContext.getBytesPerChecksum());
|
||||
}
|
||||
|
||||
this.dataBlockEncoder = dataBlockEncoder != null?
|
||||
dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
|
||||
this.dataBlockEncodingCtx = this.dataBlockEncoder.
|
||||
newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
// TODO: This should be lazily instantiated since we usually do NOT need this default encoder
|
||||
this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
// TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
|
||||
baosInMemory = new ByteArrayOutputStream();
|
||||
|
||||
prevOffsetByType = new long[BlockType.values().length];
|
||||
for (int i = 0; i < prevOffsetByType.length; ++i)
|
||||
prevOffsetByType[i] = -1;
|
||||
|
||||
for (int i = 0; i < prevOffsetByType.length; ++i) {
|
||||
prevOffsetByType[i] = UNSET;
|
||||
}
|
||||
// TODO: Why fileContext saved away when we have dataBlockEncoder and/or
|
||||
// defaultDataBlockEncoder?
|
||||
this.fileContext = fileContext;
|
||||
}
|
||||
|
||||
|
@ -899,7 +936,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @return the stream the user can write their data into
|
||||
* @throws IOException
|
||||
*/
|
||||
public DataOutputStream startWriting(BlockType newBlockType)
|
||||
DataOutputStream startWriting(BlockType newBlockType)
|
||||
throws IOException {
|
||||
if (state == State.BLOCK_READY && startOffset != -1) {
|
||||
// We had a previous block that was written to a stream at a specific
|
||||
|
@ -929,10 +966,10 @@ public class HFileBlock implements Cacheable {
|
|||
* @param cell
|
||||
* @throws IOException
|
||||
*/
|
||||
public void write(Cell cell) throws IOException{
|
||||
void write(Cell cell) throws IOException{
|
||||
expectState(State.WRITING);
|
||||
this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
|
||||
this.userDataStream);
|
||||
this.unencodedDataSizeWritten +=
|
||||
this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -976,6 +1013,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
userDataStream.flush();
|
||||
// This does an array copy, so it is safe to cache this byte array.
|
||||
// Header is still the empty, 'dummy' header that is yet to be filled out.
|
||||
uncompressedBytesWithHeader = baosInMemory.toByteArray();
|
||||
prevOffset = prevOffsetByType[blockType.getId()];
|
||||
|
||||
|
@ -987,22 +1025,25 @@ public class HFileBlock implements Cacheable {
|
|||
onDiskBytesWithHeader = dataBlockEncodingCtx
|
||||
.compressAndEncrypt(uncompressedBytesWithHeader);
|
||||
} else {
|
||||
onDiskBytesWithHeader = defaultBlockEncodingCtx
|
||||
.compressAndEncrypt(uncompressedBytesWithHeader);
|
||||
onDiskBytesWithHeader = this.defaultBlockEncodingCtx.
|
||||
compressAndEncrypt(uncompressedBytesWithHeader);
|
||||
}
|
||||
// Calculate how many bytes we need for checksum on the tail of the block.
|
||||
int numBytes = (int) ChecksumUtil.numBytes(
|
||||
onDiskBytesWithHeader.length,
|
||||
fileContext.getBytesPerChecksum());
|
||||
|
||||
// put the header for on disk bytes
|
||||
// Put the header for the on disk bytes; header currently is unfilled-out
|
||||
putHeader(onDiskBytesWithHeader, 0,
|
||||
onDiskBytesWithHeader.length + numBytes,
|
||||
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
|
||||
// set the header for the uncompressed bytes (for cache-on-write)
|
||||
putHeader(uncompressedBytesWithHeader, 0,
|
||||
// Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from
|
||||
// onDiskBytesWithHeader array.
|
||||
if (onDiskBytesWithHeader != uncompressedBytesWithHeader) {
|
||||
putHeader(uncompressedBytesWithHeader, 0,
|
||||
onDiskBytesWithHeader.length + numBytes,
|
||||
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
|
||||
|
||||
}
|
||||
onDiskChecksum = new byte[numBytes];
|
||||
ChecksumUtil.generateChecksums(
|
||||
onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
|
||||
|
@ -1036,9 +1077,9 @@ public class HFileBlock implements Cacheable {
|
|||
* @param out
|
||||
* @throws IOException
|
||||
*/
|
||||
public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
|
||||
void writeHeaderAndData(FSDataOutputStream out) throws IOException {
|
||||
long offset = out.getPos();
|
||||
if (startOffset != -1 && offset != startOffset) {
|
||||
if (startOffset != UNSET && offset != startOffset) {
|
||||
throw new IOException("A " + blockType + " block written to a "
|
||||
+ "stream twice, first at offset " + startOffset + ", then at "
|
||||
+ offset);
|
||||
|
@ -1091,7 +1132,7 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* Releases resources used by this writer.
|
||||
*/
|
||||
public void release() {
|
||||
void release() {
|
||||
if (dataBlockEncodingCtx != null) {
|
||||
dataBlockEncodingCtx.close();
|
||||
dataBlockEncodingCtx = null;
|
||||
|
@ -1112,9 +1153,8 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
int getOnDiskSizeWithoutHeader() {
|
||||
expectState(State.BLOCK_READY);
|
||||
return onDiskBytesWithHeader.length
|
||||
+ onDiskChecksum.length
|
||||
- HConstants.HFILEBLOCK_HEADER_SIZE;
|
||||
return onDiskBytesWithHeader.length +
|
||||
onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1146,7 +1186,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/** @return true if a block is being written */
|
||||
public boolean isWriting() {
|
||||
boolean isWriting() {
|
||||
return state == State.WRITING;
|
||||
}
|
||||
|
||||
|
@ -1157,7 +1197,7 @@ public class HFileBlock implements Cacheable {
|
|||
*
|
||||
* @return the number of bytes written
|
||||
*/
|
||||
public int blockSizeWritten() {
|
||||
int blockSizeWritten() {
|
||||
if (state != State.WRITING) return 0;
|
||||
return this.unencodedDataSizeWritten;
|
||||
}
|
||||
|
@ -1205,7 +1245,7 @@ public class HFileBlock implements Cacheable {
|
|||
* @param out the file system output stream
|
||||
* @throws IOException
|
||||
*/
|
||||
public void writeBlock(BlockWritable bw, FSDataOutputStream out)
|
||||
void writeBlock(BlockWritable bw, FSDataOutputStream out)
|
||||
throws IOException {
|
||||
bw.writeToBlock(startWriting(bw.getBlockType()));
|
||||
writeHeaderAndData(out);
|
||||
|
@ -1218,7 +1258,7 @@ public class HFileBlock implements Cacheable {
|
|||
* version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
|
||||
* 0 value in bytesPerChecksum.
|
||||
*/
|
||||
public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
|
||||
HFileBlock getBlockForCaching(CacheConfig cacheConf) {
|
||||
HFileContext newContext = new HFileContextBuilder()
|
||||
.withBlockSize(fileContext.getBlocksize())
|
||||
.withBytesPerCheckSum(0)
|
||||
|
@ -1241,7 +1281,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/** Something that can be written into a block. */
|
||||
public interface BlockWritable {
|
||||
interface BlockWritable {
|
||||
|
||||
/** The type of block this data should use. */
|
||||
BlockType getBlockType();
|
||||
|
@ -1258,7 +1298,7 @@ public class HFileBlock implements Cacheable {
|
|||
// Block readers and writers
|
||||
|
||||
/** An interface allowing to iterate {@link HFileBlock}s. */
|
||||
public interface BlockIterator {
|
||||
interface BlockIterator {
|
||||
|
||||
/**
|
||||
* Get the next block, or null if there are no more blocks to iterate.
|
||||
|
@ -1273,7 +1313,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/** A full-fledged reader with iteration ability. */
|
||||
public interface FSReader {
|
||||
interface FSReader {
|
||||
|
||||
/**
|
||||
* Reads the block at the given offset in the file with the given on-disk
|
||||
|
@ -1321,9 +1361,15 @@ public class HFileBlock implements Cacheable {
|
|||
long offset = -1;
|
||||
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||
@Override
|
||||
public String toString() {
|
||||
return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
|
||||
}
|
||||
}
|
||||
|
||||
/** Reads version 2 blocks from the filesystem. */
|
||||
/**
|
||||
* Reads version 2 blocks from the filesystem.
|
||||
*/
|
||||
static class FSReaderImpl implements FSReader {
|
||||
/** The file system stream of the underlying {@link HFile} that
|
||||
* does or doesn't do checksum validations in the filesystem */
|
||||
|
@ -1362,7 +1408,7 @@ public class HFileBlock implements Cacheable {
|
|||
// Cache the fileName
|
||||
protected String pathName;
|
||||
|
||||
public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
|
||||
FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
|
||||
HFileContext fileContext) throws IOException {
|
||||
this.fileSize = fileSize;
|
||||
this.hfs = hfs;
|
||||
|
@ -1420,8 +1466,8 @@ public class HFileBlock implements Cacheable {
|
|||
* the on-disk size of the next block, or -1 if it could not be determined.
|
||||
*
|
||||
* @param dest destination buffer
|
||||
* @param destOffset offset in the destination buffer
|
||||
* @param size size of the block to be read
|
||||
* @param destOffset offset into the destination buffer at where to put the bytes we read
|
||||
* @param size size of read
|
||||
* @param peekIntoNextBlock whether to read the next block's on-disk size
|
||||
* @param fileOffset position in the stream to read at
|
||||
* @param pread whether we should do a positional read
|
||||
|
@ -1430,12 +1476,10 @@ public class HFileBlock implements Cacheable {
|
|||
* -1 if it could not be determined
|
||||
* @throws IOException
|
||||
*/
|
||||
protected int readAtOffset(FSDataInputStream istream,
|
||||
byte[] dest, int destOffset, int size,
|
||||
protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
|
||||
boolean peekIntoNextBlock, long fileOffset, boolean pread)
|
||||
throws IOException {
|
||||
if (peekIntoNextBlock &&
|
||||
destOffset + size + hdrSize > dest.length) {
|
||||
throws IOException {
|
||||
if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
|
||||
// We are asked to read the next block's header as well, but there is
|
||||
// not enough room in the array.
|
||||
throw new IOException("Attempted to read " + size + " bytes and " +
|
||||
|
@ -1522,7 +1566,7 @@ public class HFileBlock implements Cacheable {
|
|||
HFile.LOG.warn(msg);
|
||||
throw new IOException(msg); // cannot happen case here
|
||||
}
|
||||
HFile.checksumFailures.increment(); // update metrics
|
||||
HFile.CHECKSUM_FAILURES.increment(); // update metrics
|
||||
|
||||
// If we have a checksum failure, we fall back into a mode where
|
||||
// the next few reads use HDFS level checksums. We aim to make the
|
||||
|
@ -1597,6 +1641,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
|
||||
|
||||
// See if we can avoid reading the header. This is desirable, because
|
||||
// we will not incur a backward seek operation if we have already
|
||||
// read this block's header as part of the previous read's look-ahead.
|
||||
|
@ -1604,15 +1649,22 @@ public class HFileBlock implements Cacheable {
|
|||
// been read.
|
||||
// TODO: How often does this optimization fire? Has to be same thread so the thread local
|
||||
// is pertinent and we have to be reading next block as in a big scan.
|
||||
ByteBuffer headerBuf = null;
|
||||
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
|
||||
ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
|
||||
|
||||
boolean preReadHeader = false;
|
||||
if (prefetchedHeader != null && prefetchedHeader.offset == offset) {
|
||||
headerBuf = prefetchedHeader.buf;
|
||||
preReadHeader = true;
|
||||
}
|
||||
// Allocate enough space to fit the next block's header too.
|
||||
int nextBlockOnDiskSize = 0;
|
||||
byte[] onDiskBlock = null;
|
||||
|
||||
HFileBlock b = null;
|
||||
boolean fastPath = false;
|
||||
boolean readHdrOnly = false;
|
||||
if (onDiskSizeWithHeader > 0) {
|
||||
fastPath = true;
|
||||
// We know the total on-disk size. Read the entire block into memory,
|
||||
// then parse the header. This code path is used when
|
||||
// doing a random read operation relying on the block index, as well as
|
||||
|
@ -1669,6 +1721,7 @@ public class HFileBlock implements Cacheable {
|
|||
// Unfortunately, we still have to do a separate read operation to
|
||||
// read the header.
|
||||
if (headerBuf == null) {
|
||||
readHdrOnly = true;
|
||||
// From the header, determine the on-disk size of the given hfile
|
||||
// block, and read the remaining data, thereby incurring two read
|
||||
// operations. This might happen when we are doing the first read
|
||||
|
@ -1681,12 +1734,12 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
// TODO: FIX!!! Expensive parse just to get a length
|
||||
b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
|
||||
// onDiskBlock is whole block + header + checksums then extra hdrSize to read next header
|
||||
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
|
||||
// headerBuf is HBB
|
||||
// headerBuf is HBB. Copy hdr into onDiskBlock
|
||||
System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
|
||||
nextBlockOnDiskSize =
|
||||
readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
|
||||
- hdrSize, true, offset + hdrSize, pread);
|
||||
nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize,
|
||||
b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread);
|
||||
onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
|
||||
}
|
||||
|
||||
|
@ -1716,13 +1769,19 @@ public class HFileBlock implements Cacheable {
|
|||
b.offset = offset;
|
||||
b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
|
||||
b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Read preReadHeader=" + preReadHeader + ", fastPath=" + fastPath +
|
||||
", readHdrOnly=" + readHdrOnly + ", " + b);
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
|
||||
this.fileContext.setIncludesMvcc(includesMemstoreTS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
|
||||
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
|
||||
}
|
||||
|
@ -1772,11 +1831,13 @@ public class HFileBlock implements Cacheable {
|
|||
|
||||
@Override
|
||||
public void serialize(ByteBuffer destination) {
|
||||
this.buf.get(destination, 0, getSerializedLength()
|
||||
- EXTRA_SERIALIZATION_SPACE);
|
||||
this.buf.get(destination, 0, getSerializedLength() - EXTRA_SERIALIZATION_SPACE);
|
||||
serializeExtraInfo(destination);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write out the content of EXTRA_SERIALIZATION_SPACE. Public so can be accessed by BucketCache.
|
||||
*/
|
||||
public void serializeExtraInfo(ByteBuffer destination) {
|
||||
destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
|
||||
destination.putLong(this.offset);
|
||||
|
@ -1862,7 +1923,7 @@ public class HFileBlock implements Cacheable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Calcuate the number of bytes required to store all the checksums
|
||||
* Calculate the number of bytes required to store all the checksums
|
||||
* for this block. Each checksum value is a 4 byte integer.
|
||||
*/
|
||||
int totalChecksumBytes() {
|
||||
|
@ -1888,16 +1949,14 @@ public class HFileBlock implements Cacheable {
|
|||
* Maps a minor version to the size of the header.
|
||||
*/
|
||||
public static int headerSize(boolean usesHBaseChecksum) {
|
||||
if (usesHBaseChecksum) {
|
||||
return HConstants.HFILEBLOCK_HEADER_SIZE;
|
||||
}
|
||||
return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
||||
return usesHBaseChecksum?
|
||||
HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the appropriate DUMMY_HEADER for the minor version
|
||||
*/
|
||||
public byte[] getDummyHeaderForVersion() {
|
||||
byte[] getDummyHeaderForVersion() {
|
||||
return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
|
||||
}
|
||||
|
||||
|
@ -1905,17 +1964,14 @@ public class HFileBlock implements Cacheable {
|
|||
* Return the appropriate DUMMY_HEADER for the minor version
|
||||
*/
|
||||
static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
|
||||
if (usesHBaseChecksum) {
|
||||
return HConstants.HFILEBLOCK_DUMMY_HEADER;
|
||||
}
|
||||
return DUMMY_HEADER_NO_CHECKSUM;
|
||||
return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the HFileContext used to create this HFileBlock. Not necessary the
|
||||
* fileContext for the file from which this block's data was originally read.
|
||||
*/
|
||||
public HFileContext getHFileContext() {
|
||||
HFileContext getHFileContext() {
|
||||
return this.fileContext;
|
||||
}
|
||||
|
||||
|
@ -1927,7 +1983,7 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* @return true if this block is backed by a shared memory area(such as that of a BucketCache).
|
||||
*/
|
||||
public boolean usesSharedMemory() {
|
||||
boolean usesSharedMemory() {
|
||||
return this.memType == MemoryType.SHARED;
|
||||
}
|
||||
|
||||
|
|
|
@ -1491,13 +1491,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
|
||||
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
|
||||
if (cachedBlock != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("From Cache " + cachedBlock);
|
||||
}
|
||||
if (Trace.isTracing()) {
|
||||
traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
|
||||
}
|
||||
assert cachedBlock.isUnpacked() : "Packed block leak.";
|
||||
if (cachedBlock.getBlockType().isData()) {
|
||||
if (updateCacheMetrics) {
|
||||
HFile.dataBlockReadCnt.increment();
|
||||
HFile.DATABLOCK_READ_COUNT.increment();
|
||||
}
|
||||
// Validate encoding type for data blocks. We include encoding
|
||||
// type in the cache key, and we expect it to match on a cache hit.
|
||||
|
@ -1537,7 +1540,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
}
|
||||
|
||||
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
|
||||
HFile.dataBlockReadCnt.increment();
|
||||
HFile.DATABLOCK_READ_COUNT.increment();
|
||||
}
|
||||
|
||||
return unpacked;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
* getValue.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface HFileScanner extends Shipper {
|
||||
public interface HFileScanner extends Shipper, Closeable {
|
||||
/**
|
||||
* SeekTo or just before the passed <code>cell</code>. Examine the return
|
||||
* code to figure whether we found the cell or not.
|
||||
|
@ -154,4 +155,4 @@ public interface HFileScanner extends Shipper {
|
|||
* Close this HFile scanner and do necessary cleanup.
|
||||
*/
|
||||
void close();
|
||||
}
|
||||
}
|
|
@ -57,6 +57,8 @@ import org.apache.hadoop.io.Writable;
|
|||
public class HFileWriterImpl implements HFile.Writer {
|
||||
private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class);
|
||||
|
||||
private static final long UNSET = -1;
|
||||
|
||||
/** The Cell previously appended. Becomes the last cell in the file.*/
|
||||
protected Cell lastCell = null;
|
||||
|
||||
|
@ -129,16 +131,16 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<InlineBlockWriter>();
|
||||
|
||||
/** block writer */
|
||||
protected HFileBlock.Writer fsBlockWriter;
|
||||
protected HFileBlock.Writer blockWriter;
|
||||
|
||||
private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
|
||||
private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
|
||||
|
||||
/** The offset of the first data block or -1 if the file is empty. */
|
||||
private long firstDataBlockOffset = -1;
|
||||
private long firstDataBlockOffset = UNSET;
|
||||
|
||||
/** The offset of the last data block or 0 if the file is empty. */
|
||||
protected long lastDataBlockOffset;
|
||||
protected long lastDataBlockOffset = UNSET;
|
||||
|
||||
/**
|
||||
* The last(stop) Cell of the previous data block.
|
||||
|
@ -164,8 +166,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
} else {
|
||||
this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
|
||||
}
|
||||
this.comparator = comparator != null ? comparator
|
||||
: CellComparator.COMPARATOR;
|
||||
this.comparator = comparator != null? comparator: CellComparator.COMPARATOR;
|
||||
|
||||
closeOutputStream = path != null;
|
||||
this.cacheConf = cacheConf;
|
||||
|
@ -273,15 +274,15 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
|
||||
/** Additional initialization steps */
|
||||
protected void finishInit(final Configuration conf) {
|
||||
if (fsBlockWriter != null) {
|
||||
if (blockWriter != null) {
|
||||
throw new IllegalStateException("finishInit called twice");
|
||||
}
|
||||
|
||||
fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
|
||||
blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
|
||||
|
||||
// Data block index writer
|
||||
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
|
||||
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
|
||||
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
|
||||
cacheIndexesOnWrite ? cacheConf : null,
|
||||
cacheIndexesOnWrite ? name : null);
|
||||
dataBlockIndexWriter.setMaxChunkSize(
|
||||
|
@ -299,29 +300,29 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
* @throws IOException
|
||||
*/
|
||||
protected void checkBlockBoundary() throws IOException {
|
||||
if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return;
|
||||
if (blockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return;
|
||||
finishBlock();
|
||||
writeInlineBlocks(false);
|
||||
newBlock();
|
||||
}
|
||||
|
||||
/** Clean up the current data block */
|
||||
/** Clean up the data block that is currently being written.*/
|
||||
private void finishBlock() throws IOException {
|
||||
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return;
|
||||
if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) return;
|
||||
|
||||
// Update the first data block offset for scanning.
|
||||
if (firstDataBlockOffset == -1) {
|
||||
// Update the first data block offset if UNSET; used scanning.
|
||||
if (firstDataBlockOffset == UNSET) {
|
||||
firstDataBlockOffset = outputStream.getPos();
|
||||
}
|
||||
// Update the last data block offset
|
||||
// Update the last data block offset each time through here.
|
||||
lastDataBlockOffset = outputStream.getPos();
|
||||
fsBlockWriter.writeHeaderAndData(outputStream);
|
||||
int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
|
||||
blockWriter.writeHeaderAndData(outputStream);
|
||||
int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
|
||||
Cell indexEntry =
|
||||
getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
|
||||
dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
|
||||
lastDataBlockOffset, onDiskSize);
|
||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||
totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
|
||||
if (cacheConf.shouldCacheDataOnWrite()) {
|
||||
doCacheOnWrite(lastDataBlockOffset);
|
||||
}
|
||||
|
@ -461,12 +462,12 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
while (ibw.shouldWriteBlock(closing)) {
|
||||
long offset = outputStream.getPos();
|
||||
boolean cacheThisBlock = ibw.getCacheOnWrite();
|
||||
ibw.writeInlineBlock(fsBlockWriter.startWriting(
|
||||
ibw.writeInlineBlock(blockWriter.startWriting(
|
||||
ibw.getInlineBlockType()));
|
||||
fsBlockWriter.writeHeaderAndData(outputStream);
|
||||
ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
|
||||
fsBlockWriter.getUncompressedSizeWithoutHeader());
|
||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||
blockWriter.writeHeaderAndData(outputStream);
|
||||
ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
|
||||
blockWriter.getUncompressedSizeWithoutHeader());
|
||||
totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
|
||||
|
||||
if (cacheThisBlock) {
|
||||
doCacheOnWrite(offset);
|
||||
|
@ -481,7 +482,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
* the cache key.
|
||||
*/
|
||||
private void doCacheOnWrite(long offset) {
|
||||
HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
|
||||
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
|
||||
cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock);
|
||||
}
|
||||
|
||||
|
@ -492,7 +493,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
*/
|
||||
protected void newBlock() throws IOException {
|
||||
// This is where the next block begins.
|
||||
fsBlockWriter.startWriting(BlockType.DATA);
|
||||
blockWriter.startWriting(BlockType.DATA);
|
||||
firstCellInBlock = null;
|
||||
if (lastCell != null) {
|
||||
lastCellOfPreviousBlock = lastCell;
|
||||
|
@ -547,15 +548,15 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
// store the beginning offset
|
||||
long offset = outputStream.getPos();
|
||||
// write the metadata content
|
||||
DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
|
||||
DataOutputStream dos = blockWriter.startWriting(BlockType.META);
|
||||
metaData.get(i).write(dos);
|
||||
|
||||
fsBlockWriter.writeHeaderAndData(outputStream);
|
||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||
blockWriter.writeHeaderAndData(outputStream);
|
||||
totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
|
||||
|
||||
// Add the new meta block to the meta index.
|
||||
metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
|
||||
fsBlockWriter.getOnDiskSizeWithHeader());
|
||||
blockWriter.getOnDiskSizeWithHeader());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -572,10 +573,10 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
trailer.setLoadOnOpenOffset(rootIndexOffset);
|
||||
|
||||
// Meta block index.
|
||||
metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
|
||||
metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(
|
||||
BlockType.ROOT_INDEX), "meta");
|
||||
fsBlockWriter.writeHeaderAndData(outputStream);
|
||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||
blockWriter.writeHeaderAndData(outputStream);
|
||||
totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
|
||||
|
||||
if (this.hFileContext.isIncludesMvcc()) {
|
||||
appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
|
||||
|
@ -583,14 +584,14 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
}
|
||||
|
||||
// File info
|
||||
writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
|
||||
fsBlockWriter.writeHeaderAndData(outputStream);
|
||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||
writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
|
||||
blockWriter.writeHeaderAndData(outputStream);
|
||||
totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
|
||||
|
||||
// Load-on-open data supplied by higher levels, e.g. Bloom filters.
|
||||
for (BlockWritable w : additionalLoadOnOpenData){
|
||||
fsBlockWriter.writeBlock(w, outputStream);
|
||||
totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
|
||||
blockWriter.writeBlock(w, outputStream);
|
||||
totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
|
||||
}
|
||||
|
||||
// Now finish off the trailer.
|
||||
|
@ -605,7 +606,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
|
||||
finishClose(trailer);
|
||||
|
||||
fsBlockWriter.release();
|
||||
blockWriter.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -670,11 +671,11 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
checkBlockBoundary();
|
||||
}
|
||||
|
||||
if (!fsBlockWriter.isWriting()) {
|
||||
if (!blockWriter.isWriting()) {
|
||||
newBlock();
|
||||
}
|
||||
|
||||
fsBlockWriter.write(cell);
|
||||
blockWriter.write(cell);
|
||||
|
||||
totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
|
||||
totalValueLength += cell.getValueLength();
|
||||
|
@ -686,7 +687,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
firstCellInBlock = cell;
|
||||
}
|
||||
|
||||
// TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly?
|
||||
// TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
|
||||
lastCell = cell;
|
||||
entryCount++;
|
||||
this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
|
||||
|
|
|
@ -269,9 +269,10 @@ public final class BucketAllocator {
|
|||
}
|
||||
}
|
||||
|
||||
// Default block size is 64K, so we choose more sizes near 64K, you'd better
|
||||
// Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better
|
||||
// reset it according to your cluster's block size distribution
|
||||
// TODO Support the view of block size distribution statistics
|
||||
// TODO: Why we add the extra 1024 bytes? Slop?
|
||||
private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
|
||||
16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
|
||||
56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
|
||||
|
@ -289,6 +290,9 @@ public final class BucketAllocator {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* So, what is the minimum amount of items we'll tolerate in a single bucket?
|
||||
*/
|
||||
static public final int FEWEST_ITEMS_IN_BUCKET = 4;
|
||||
|
||||
private final int[] bucketSizes;
|
||||
|
@ -308,9 +312,8 @@ public final class BucketAllocator {
|
|||
this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
|
||||
buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
|
||||
if (buckets.length < this.bucketSizes.length)
|
||||
throw new BucketAllocatorException(
|
||||
"Bucket allocator size too small - must have room for at least "
|
||||
+ this.bucketSizes.length + " buckets");
|
||||
throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length +
|
||||
"); must have room for at least " + this.bucketSizes.length + " buckets");
|
||||
bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
|
||||
for (int i = 0; i < this.bucketSizes.length; ++i) {
|
||||
bucketSizeInfos[i] = new BucketSizeInfo(i);
|
||||
|
@ -321,6 +324,12 @@ public final class BucketAllocator {
|
|||
.instantiateBucket(buckets[i]);
|
||||
}
|
||||
this.totalSize = ((long) buckets.length) * bucketCapacity;
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length +
|
||||
", bucket capacity=" + this.bucketCapacity +
|
||||
"=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" +
|
||||
"(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -78,7 +78,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
|
||||
/**
|
||||
* BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
|
||||
* {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
|
||||
* BucketCache#ramCache and BucketCache#backingMap in order to
|
||||
* determine if a given element is in the cache. The bucket cache can use on-heap or
|
||||
* off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
|
||||
* store/read the block data.
|
||||
|
@ -87,7 +87,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
|
||||
*
|
||||
* <p>BucketCache can be used as mainly a block cache (see
|
||||
* {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
|
||||
* {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
|
||||
* LruBlockCache to decrease CMS GC and heap fragmentation.
|
||||
*
|
||||
* <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
|
||||
|
@ -349,6 +349,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
*/
|
||||
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
|
||||
boolean wait) {
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
|
||||
if (!cacheEnabled) {
|
||||
return;
|
||||
}
|
||||
|
@ -422,6 +423,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
// TODO : change this area - should be removed after server cells and
|
||||
// 12295 are available
|
||||
int len = bucketEntry.getLength();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
|
||||
}
|
||||
Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
|
||||
bucketEntry.deserializerReference(this.deserialiserMap));
|
||||
long timeTaken = System.nanoTime() - start;
|
||||
|
@ -628,7 +632,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
*/
|
||||
private void freeSpace(final String why) {
|
||||
// Ensure only one freeSpace progress at a time
|
||||
if (!freeSpaceLock.tryLock()) return;
|
||||
if (!freeSpaceLock.tryLock()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
freeInProgress = true;
|
||||
long bytesToFreeWithoutExtra = 0;
|
||||
|
@ -657,7 +663,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return;
|
||||
}
|
||||
long currentSize = bucketAllocator.getUsedSize();
|
||||
long totalSize=bucketAllocator.getTotalSize();
|
||||
long totalSize = bucketAllocator.getTotalSize();
|
||||
if (LOG.isDebugEnabled() && msgBuffer != null) {
|
||||
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
|
||||
" of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
|
||||
|
@ -864,7 +870,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
// Make sure data pages are written are on media before we update maps.
|
||||
// Make sure data pages are written on media before we update maps.
|
||||
try {
|
||||
ioEngine.sync();
|
||||
} catch (IOException ioex) {
|
||||
|
@ -938,9 +944,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
FileOutputStream fos = null;
|
||||
ObjectOutputStream oos = null;
|
||||
try {
|
||||
if (!ioEngine.isPersistent())
|
||||
throw new IOException(
|
||||
"Attempt to persist non-persistent cache mappings!");
|
||||
if (!ioEngine.isPersistent()) {
|
||||
throw new IOException("Attempt to persist non-persistent cache mappings!");
|
||||
}
|
||||
fos = new FileOutputStream(persistencePath, false);
|
||||
oos = new ObjectOutputStream(fos);
|
||||
oos.writeLong(cacheCapacity);
|
||||
|
@ -1020,19 +1026,17 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
/**
|
||||
* Used to shut down the cache -or- turn it off in the case of something
|
||||
* broken.
|
||||
* Used to shut down the cache -or- turn it off in the case of something broken.
|
||||
*/
|
||||
private void disableCache() {
|
||||
if (!cacheEnabled)
|
||||
return;
|
||||
if (!cacheEnabled) return;
|
||||
cacheEnabled = false;
|
||||
ioEngine.shutdown();
|
||||
this.scheduleThreadPool.shutdown();
|
||||
for (int i = 0; i < writerThreads.length; ++i)
|
||||
writerThreads[i].interrupt();
|
||||
for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
|
||||
this.ramCache.clear();
|
||||
if (!ioEngine.isPersistent() || persistencePath == null) {
|
||||
// If persistent ioengine and a path, we will serialize out the backingMap.
|
||||
this.backingMap.clear();
|
||||
}
|
||||
}
|
||||
|
@ -1327,6 +1331,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
|
||||
ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
|
||||
block.serializeExtraInfo(extraInfoBuffer);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Write offset=" + offset + ", len=" + len);
|
||||
}
|
||||
ioEngine.write(sliceBuf, offset);
|
||||
ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
|
||||
} else {
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Random;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
|
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
|
@ -80,6 +80,7 @@ public class TestDataBlockEncoders {
|
|||
|
||||
private static int ENCODED_DATA_OFFSET = HConstants.HFILEBLOCK_HEADER_SIZE
|
||||
+ DataBlockEncoding.ID_SIZE;
|
||||
static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
|
||||
private RedundantKVGenerator generator = new RedundantKVGenerator();
|
||||
private Random randomizer = new Random(42l);
|
||||
|
@ -109,11 +110,9 @@ public class TestDataBlockEncoders {
|
|||
.withIncludesTags(includesTags)
|
||||
.withCompression(algo).build();
|
||||
if (encoder != null) {
|
||||
return encoder.newDataBlockEncodingContext(encoding,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
return encoder.newDataBlockEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
} else {
|
||||
return new HFileBlockDefaultEncodingContext(encoding,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
return new HFileBlockDefaultEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -249,7 +248,7 @@ public class TestDataBlockEncoders {
|
|||
HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
|
||||
baos.write(HFILEBLOCK_DUMMY_HEADER);
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
encoder.startBlockEncoding(encodingContext, dos);
|
||||
for (KeyValue kv : kvs) {
|
||||
|
@ -386,10 +385,10 @@ public class TestDataBlockEncoders {
|
|||
continue;
|
||||
}
|
||||
HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
HFILEBLOCK_DUMMY_HEADER, fileContext);
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
|
||||
baos.write(HFILEBLOCK_DUMMY_HEADER);
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
encoder.startBlockEncoding(encodingContext, dos);
|
||||
for (KeyValue kv : kvList) {
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
@Category({IOTests.class, SmallTests.class})
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestSeekToBlockWithEncoders {
|
||||
|
||||
static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
|
||||
private final boolean useOffheapData;
|
||||
|
||||
@Parameters
|
||||
|
@ -281,7 +281,7 @@ public class TestSeekToBlockWithEncoders {
|
|||
.withIncludesMvcc(false).withIncludesTags(false)
|
||||
.withCompression(Compression.Algorithm.NONE).build();
|
||||
HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding,
|
||||
HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
HFILEBLOCK_DUMMY_HEADER, meta);
|
||||
ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
|
||||
encodingContext, this.useOffheapData);
|
||||
DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR,
|
||||
|
|
|
@ -42,12 +42,12 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
|
@ -97,7 +97,7 @@ public class TestForceCacheImportantBlocks {
|
|||
public void setup() {
|
||||
// Make sure we make a new one each time.
|
||||
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
|
||||
HFile.dataBlockReadCnt.set(0);
|
||||
HFile.DATABLOCK_READ_COUNT.set(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -114,12 +114,12 @@ public class TestForceCacheImportantBlocks {
|
|||
CacheStats stats = cache.getStats();
|
||||
writeTestData(region);
|
||||
assertEquals(0, stats.getHitCount());
|
||||
assertEquals(0, HFile.dataBlockReadCnt.get());
|
||||
assertEquals(0, HFile.DATABLOCK_READ_COUNT.get());
|
||||
// Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss
|
||||
// count should go up. Otherwise, all should be cached and the miss count should not rise.
|
||||
region.get(new Get(Bytes.toBytes("row" + 0)));
|
||||
assertTrue(stats.getHitCount() > 0);
|
||||
assertTrue(HFile.dataBlockReadCnt.get() > 0);
|
||||
assertTrue(HFile.DATABLOCK_READ_COUNT.get() > 0);
|
||||
long missCount = stats.getMissCount();
|
||||
region.get(new Get(Bytes.toBytes("row" + 0)));
|
||||
if (this.cfCacheEnabled) assertEquals(missCount, stats.getMissCount());
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
/**
|
||||
* Test for file-backed BucketCache.
|
||||
*/
|
||||
@Category({IOTests.class, SmallTests.class})
|
||||
public class TestHFileBackedByBucketCache {
|
||||
private static final Log LOG = LogFactory.getLog(TestHFileBackedByBucketCache.class);
|
||||
@Rule public TestName name = new TestName();
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
||||
withLookingForStuckThread(true).build();
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final int ROW_LENGTH = 4;
|
||||
private Configuration conf;
|
||||
private FileSystem fs;
|
||||
|
||||
// MATH! SIZING FOR THE TEST!
|
||||
// Set bucketcache to be smallest size possible which is 1MB. We do that in the test
|
||||
// @Before <code>before</code> method. Into out 1MB cache, have it so only one bucket. If
|
||||
// bucketsize is set to 125kb in size, we will have one bucket in our 1MB bucketcache. It is
|
||||
// cryptic how this comes about but basically comes down to
|
||||
// {@link BucketAllocator#FEWEST_ITEMS_IN_BUCKET} being '4'... so 4 * 125 = just over 500k or so
|
||||
// which makes for one bucket only in 1M which you can see from TRACE logging:
|
||||
//
|
||||
// Cache totalSize=532480, buckets=1, bucket capacity=
|
||||
// 532480=(4*133120)=(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))
|
||||
//
|
||||
// Now into this one big bucket, we write hfileblocks....Each hfileblock has two keys because
|
||||
// first is under the BLOCKSIZE of 64k and then the second puts us over the 64k...
|
||||
// so two Cells per block...
|
||||
/**
|
||||
* Default size.
|
||||
*/
|
||||
private static final int BLOCKSIZE = 64 * 1024;
|
||||
|
||||
/**
|
||||
* Bucket sizes get multiplied by 4 for actual bucket size.
|
||||
* See {@link BucketAllocator#FEWEST_ITEMS_IN_BUCKET}.
|
||||
*/
|
||||
private static final int BUCKETSIZE = 125 * 1024;
|
||||
|
||||
/**
|
||||
* Make it so one Cell is just under a BLOCKSIZE. The second Cell puts us over the BLOCKSIZE
|
||||
* so we have two Cells per HFilBlock.
|
||||
*/
|
||||
private static final int VALUE_SIZE = 33 * 1024;
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
// Do setup of a bucketcache that has one bucket only. Enable trace-level logging for
|
||||
// key classes.
|
||||
this.conf = TEST_UTIL.getConfiguration();
|
||||
this.fs = FileSystem.get(conf);
|
||||
|
||||
// Set BucketCache and HFileBlock to log at trace level.
|
||||
setTraceLevel(BucketCache.class);
|
||||
setTraceLevel(HFileBlock.class);
|
||||
setTraceLevel(HFileReaderImpl.class);
|
||||
setTraceLevel(BucketAllocator.class);
|
||||
}
|
||||
|
||||
// Assumes log4j logging.
|
||||
private static void setTraceLevel(final Class<?> clazz) {
|
||||
Log testlog = LogFactory.getLog(clazz.getName());
|
||||
((org.apache.commons.logging.impl.Log4JLogger)testlog).getLogger().
|
||||
setLevel(org.apache.log4j.Level.TRACE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that bucketcache is caching and that the persist of in-memory map works
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testBucketCacheCachesAndPersists() throws IOException {
|
||||
// Set up a bucket cache. Set up one that will persist by passing a
|
||||
// hbase.bucketcache.persistent.path value to store the in-memory map of what is out in
|
||||
// the file-backed bucketcache. Set bucketcache to have one size only, BUCKETSIZE.
|
||||
// See "MATH! SIZING FOR THE TEST!" note above around declaration of BUCKETSIZE
|
||||
String bucketCacheDataFile =
|
||||
(new Path(TEST_UTIL.getDataTestDir(), "bucketcache.data")).toString();
|
||||
(new File(bucketCacheDataFile)).getParentFile().mkdirs();
|
||||
this.conf.set("hbase.bucketcache.ioengine", "file:" + bucketCacheDataFile);
|
||||
this.conf.set("hbase.bucketcache.persistent.path", bucketCacheDataFile + ".map");
|
||||
this.conf.setStrings("hbase.bucketcache.bucket.sizes", Integer.toString(BUCKETSIZE));
|
||||
// This is minimum bucketcache size.... 1MB.
|
||||
this.conf.setInt("hbase.bucketcache.size", 1);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf);
|
||||
Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), this.name.getMethodName());
|
||||
// Write 8 entries which should make for four hfileBlocks.
|
||||
final int count = 8;
|
||||
final int hfileBlockCount = 4;
|
||||
List<Cell> writtenCells = writeFile(hfilePath, Compression.Algorithm.NONE, cacheConfig, count);
|
||||
CacheStats stats = cacheConfig.getBlockCache().getStats();
|
||||
List<Cell> readCells = readFile(hfilePath, cacheConfig);
|
||||
assertTrue(!writtenCells.isEmpty());
|
||||
assertEquals(writtenCells.size(), readCells.size());
|
||||
assertEquals(hfileBlockCount, stats.getMissCount());
|
||||
assertEquals(1, stats.getHitCount()); // readFile will read first block is from cache.
|
||||
|
||||
// Now, close out the cache and then reopen and verify that cache still has our blocks.
|
||||
// Assert that persistence works.
|
||||
cacheConfig.getBlockCache().shutdown();
|
||||
// Need to clear the global cache else the new CacheConfig won't create a bucketcache but
|
||||
// just reuse the old one.
|
||||
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
|
||||
cacheConfig = new CacheConfig(conf);
|
||||
stats = cacheConfig.getBlockCache().getStats();
|
||||
assertEquals(0, stats.getHitCachingCount());
|
||||
readCells = readFile(hfilePath, cacheConfig);
|
||||
// readFile will read all hfileblocs in the file, hfileBlockCount, and then one more, so + 1.
|
||||
assertEquals(hfileBlockCount + 1, stats.getHitCachingCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a file with <code>count</code> entries.
|
||||
* @return The Cells written to the file.
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<Cell> writeFile(final Path hfilePath, final Compression.Algorithm compressAlgo,
|
||||
final CacheConfig cacheConfig, final int count)
|
||||
throws IOException {
|
||||
List<Cell> cells = new ArrayList<Cell>(count);
|
||||
HFileContext context =
|
||||
new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(compressAlgo).build();
|
||||
try (HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig).
|
||||
withPath(fs, hfilePath).
|
||||
withFileContext(context).
|
||||
withComparator(CellComparator.COMPARATOR).
|
||||
create()) {
|
||||
byte [] valueBytes = new byte [VALUE_SIZE];
|
||||
for (int i = 0; i < valueBytes.length; i++) valueBytes[i] = '0';
|
||||
for (int i = 0; i < count; ++i) {
|
||||
byte[] keyBytes = format(i);
|
||||
KeyValue keyValue = new KeyValue(keyBytes, HConstants.CATALOG_FAMILY, keyBytes,
|
||||
HConstants.LATEST_TIMESTAMP, valueBytes);
|
||||
writer.append(keyValue);
|
||||
cells.add(keyValue);
|
||||
}
|
||||
}
|
||||
return cells;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the whole file, then read the first block so we get something from cache for sure.
|
||||
* So... there are TOTAL_BLOCKS_IN_FILE read + 1. See math at head of this class.
|
||||
* @return The Cells read from the file.
|
||||
*/
|
||||
private List<Cell> readFile(final Path hfilePath, final CacheConfig cacheConfig)
|
||||
throws IOException {
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
try (HFile.Reader reader = HFile.createReader(this.fs, hfilePath, cacheConfig, this.conf);
|
||||
HFileScanner scanner = reader.getScanner(true, true)) {
|
||||
scanner.seekTo();
|
||||
do {
|
||||
cells.add(scanner.getCell());
|
||||
LOG.info(scanner.getKey());
|
||||
} while (scanner.next());
|
||||
// Do a random seek just so we see a block coming from cache.
|
||||
scanner.seekTo(reader.getFirstKey());
|
||||
scanner.next();
|
||||
LOG.info(scanner.getCell());
|
||||
}
|
||||
return cells;
|
||||
}
|
||||
|
||||
/*
|
||||
* Format passed integer.
|
||||
* @param number
|
||||
* @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
|
||||
* number (Does absolute in case number is negative).
|
||||
*/
|
||||
private static byte [] format(final int number) {
|
||||
byte [] b = new byte[ROW_LENGTH];
|
||||
int d = Math.abs(number);
|
||||
for (int i = b.length - 1; i >= 0; i--) {
|
||||
b[i] = (byte)((d % 10) + '0');
|
||||
d /= 10;
|
||||
}
|
||||
return b;
|
||||
}
|
||||
}
|
|
@ -553,7 +553,7 @@ public class TestHFileBlock {
|
|||
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
|
||||
if (!pread) {
|
||||
assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
|
||||
HConstants.HFILEBLOCK_HEADER_SIZE));
|
||||
HConstants.HFILEBLOCK_HEADER_SIZE));
|
||||
}
|
||||
|
||||
assertEquals(expectedOffsets.get(i).longValue(), curOffset);
|
||||
|
|
|
@ -347,8 +347,7 @@ public class TestHFileBlockCompatibility {
|
|||
// These constants are as they were in minorVersion 0.
|
||||
private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
|
||||
private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
|
||||
private static final byte[] DUMMY_HEADER =
|
||||
HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
|
||||
private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
|
||||
|
||||
private enum State {
|
||||
INIT,
|
||||
|
|
|
@ -196,7 +196,7 @@ public class TestBlocksRead {
|
|||
}
|
||||
|
||||
private static long getBlkAccessCount(byte[] cf) {
|
||||
return HFile.dataBlockReadCnt.get();
|
||||
return HFile.DATABLOCK_READ_COUNT.get();
|
||||
}
|
||||
|
||||
private static long getBlkCount() {
|
||||
|
|
Loading…
Reference in New Issue