diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java index 4228f57986a..32eb0b27661 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -132,6 +132,10 @@ public enum BlockType { out.write(magic); } + public void write(ByteBuffer buf) { + buf.put(magic); + } + public void write(ByteBuff buf) { buf.put(magic); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java index 6d3bb13b80b..a6645a624ef 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java @@ -55,6 +55,26 @@ public class HFileContextBuilder { private String hfileName = null; + public HFileContextBuilder() {} + + /** + * Use this constructor if you want to change a few settings only in another context. + */ + public HFileContextBuilder(final HFileContext hfc) { + this.usesHBaseChecksum = hfc.isUseHBaseChecksum(); + this.includesMvcc = hfc.isIncludesMvcc(); + this.includesTags = hfc.isIncludesTags(); + this.compression = hfc.getCompression(); + this.compressTags = hfc.isCompressTags(); + this.checksumType = hfc.getChecksumType(); + this.bytesPerChecksum = hfc.getBytesPerChecksum(); + this.blocksize = hfc.getBlocksize(); + this.encoding = hfc.getDataBlockEncoding(); + this.cryptoContext = hfc.getEncryptionContext(); + this.fileCreateTime = hfc.getFileCreateTime(); + this.hfileName = hfc.getHFileName(); + } + public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) { this.usesHBaseChecksum = useHBaseCheckSum; return this; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 1e0e957d474..183a031ec8d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -496,6 +496,12 @@ public abstract class ByteBuff { return -(low + 1); // key not found. } + @Override + public String toString() { + return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + + ", cap= " + capacity() + "]"; + } + public static String toStringBinary(final ByteBuff b, int off, int len) { StringBuilder result = new StringBuilder(); // Just in case we are passed a 'len' that is > buffer length... diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 536872ee783..ae871c4b7f5 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -260,7 +260,7 @@ public class MemcachedBlockCache implements BlockCache { public HFileBlock decode(CachedData d) { try { ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); - return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true, + return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, true, MemoryType.EXCLUSIVE); } catch (IOException e) { LOG.warn("Error deserializing data from memcached",e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java index 69f4330f61a..b0b1714972f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java @@ -91,7 +91,7 @@ public class ChecksumUtil { // If this is an older version of the block that does not have // checksums, then return false indicating that checksum verification - // did not succeed. Actually, this methiod should never be called + // did not succeed. Actually, this method should never be called // when the minorVersion is 0, thus this is a defensive check for a // cannot-happen case. Since this is a cannot-happen case, it is // better to return false to indicate a checksum validation failure. @@ -141,8 +141,7 @@ public class ChecksumUtil { * @return The number of bytes needed to store the checksum values */ static long numBytes(long datasize, int bytesPerChecksum) { - return numChunks(datasize, bytesPerChecksum) * - HFileBlock.CHECKSUM_SIZE; + return numChunks(datasize, bytesPerChecksum) * HFileBlock.CHECKSUM_SIZE; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 6268f2eb405..f3402da9306 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -56,50 +56,131 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * Reads {@link HFile} version 1 and version 2 blocks but writes version 2 blocks only. - * Version 2 was introduced in hbase-0.92.0. Does read and write out to the filesystem but also - * the read and write to Cache. + * Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches. + * Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since + * hbase-1.3.0. + * + *

Version 1 was the original file block. Version 2 was introduced when we changed the hbase file + * format to support multi-level block indexes and compound bloom filters (HBASE-3857). * - *

HFileBlock: Version 1

- * As of this writing, there should be no more version 1 blocks found out in the wild. Version 2 - * as introduced in hbase-0.92.0. - * In version 1 all blocks are always compressed or uncompressed, as - * specified by the {@link HFile}'s compression algorithm, with a type-specific - * magic record stored in the beginning of the compressed data (i.e. one needs - * to uncompress the compressed block to determine the block type). There is - * only a single compression algorithm setting for all blocks. Offset and size - * information from the block index are required to read a block. *

HFileBlock: Version 2

* In version 2, a block is structured as follows: * - *

Be aware that when we read from HDFS, we overread pulling in the next blocks' header too. - * We do this to save having to do two seeks to read an HFileBlock; a seek to read the header - * to figure lengths, etc., and then another seek to pull in the data. + * + *

Caching

+ * Caches cache whole blocks with trailing checksums if any. We then tag on some metadata, the + * content of BLOCK_METADATA_SPACE which will be flag on if we are doing 'hbase' + * checksums and then the offset into the file which is needed when we re-make a cache key + * when we return the block to the cache as 'done'. See {@link Cacheable#serialize(ByteBuffer)} and + * {@link Cacheable#getDeserializer()}. + * + *

TODO: Should we cache the checksums? Down in Writer#getBlockForCaching(CacheConfig) where + * we make a block to cache-on-write, there is an attempt at turning off checksums. This is not the + * only place we get blocks to cache. We also will cache the raw return from an hdfs read. In this + * case, the checksums may be present. If the cache is backed by something that doesn't do ECC, + * say an SSD, we might want to preserve checksums. For now this is open question. + *

TODO: Over in BucketCache, we save a block allocation by doing a custom serialization. + * Be sure to change it if serialization changes in here. Could we add a method here that takes an + * IOEngine and that then serializes to it rather than expose our internals over in BucketCache? + * IOEngine is in the bucket subpackage. Pull it up? Then this class knows about bucketcache. Ugh. */ @InterfaceAudience.Private public class HFileBlock implements Cacheable { private static final Log LOG = LogFactory.getLog(HFileBlock.class); + /** Type of block. Header field 0. */ + private BlockType blockType; + + /** + * Size on disk excluding header, including checksum. Header field 1. + * @see Writer#putHeader(byte[], int, int, int, int) + */ + private int onDiskSizeWithoutHeader; + + /** + * Size of pure data. Does not include header or checksums. Header field 2. + * @see Writer#putHeader(byte[], int, int, int, int) + */ + private int uncompressedSizeWithoutHeader; + + /** + * The offset of the previous block on disk. Header field 3. + * @see Writer#putHeader(byte[], int, int, int, int) + */ + private long prevBlockOffset; + + /** + * Size on disk of header + data. Excludes checksum. Header field 6, + * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. + * @see Writer#putHeader(byte[], int, int, int, int) + */ + private int onDiskDataSizeWithHeader; + + + /** + * The in-memory representation of the hfile block. Can be on or offheap. Can be backed by + * a single ByteBuffer or by many. Make no assumptions. + * + *

Be careful reading from this buf. Duplicate and work on the duplicate or if + * not, be sure to reset position and limit else trouble down the road. + * + *

TODO: Make this read-only once made. + * + *

We are using the ByteBuff type. ByteBuffer is not extensible yet we need to be able to have + * a ByteBuffer-like API across multiple ByteBuffers reading from a cache such as BucketCache. + * So, we have this ByteBuff type. Unfortunately, it is spread all about HFileBlock. Would be + * good if could be confined to cache-use only but hard-to-do. + */ + private ByteBuff buf; + + /** Meta data that holds meta information on the hfileblock. + */ + private HFileContext fileContext; + + /** + * The offset of this block in the file. Populated by the reader for + * convenience of access. This offset is not part of the block header. + */ + private long offset = UNSET; + + private MemoryType memType = MemoryType.EXCLUSIVE; + + /** + * The on-disk size of the next block, including the header and checksums if present, obtained by + * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's + * header, or UNSET if unknown. + * + * Blocks try to carry the size of the next block to read in this data member. They will even have + * this value when served from cache. Could save a seek in the case where we are iterating through + * a file and some of the blocks come from cache. If from cache, then having this info to hand + * will save us doing a seek to read the header so we can read the body of a block. + * TODO: see how effective this is at saving seeks. + */ + private int nextBlockOnDiskSize = UNSET; + /** * On a checksum failure, do these many succeeding read requests using hdfs checksums before * auto-reenabling hbase checksum verification. @@ -115,14 +196,18 @@ public class HFileBlock implements Cacheable { (int)ClassSize.estimateBase(MultiByteBuff.class, false); /** - * See #blockDeserializer method for more info. - * 13 bytes of extra stuff stuck on the end of the HFileBlock that we pull in from HDFS (note, + * Space for metadata on a block that gets stored along with the block when we cache it. + * There are a few bytes stuck on the end of the HFileBlock that we pull in from HDFS (note, * when we read from HDFS, we pull in an HFileBlock AND the header of the next block if one). - * The 13 bytes are: usesHBaseChecksum (1 byte) + offset of this block (long) + - * nextBlockOnDiskSizeWithHeader (int). + * 8 bytes are offset of this block (long) in the file. Offset is important because + * used when we remake the CacheKey when we return the block to cache when done. There is also + * a flag on whether checksumming is being done by hbase or not. See class comment for note on + * uncertain state of checksumming of blocks that come out of cache (should we or should we not?). + * Finally there 4 bytes to hold the length of the next block which can save a seek on occasion. + *

This EXTRA came in with original commit of the bucketcache, HBASE-7404. Was formerly + * known as EXTRA_SERIALIZATION_SPACE. */ - public static final int EXTRA_SERIALIZATION_SPACE = - Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; + static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; /** * Each checksum value is an integer that can be stored in 4 bytes. @@ -135,57 +220,47 @@ public class HFileBlock implements Cacheable { /** * Used deserializing blocks from Cache. * - * Serializing to cache is a little hard to follow. See Writer#finishBlock for where it is done. - * When we start to append to a new HFileBlock, - * we skip over where the header should go before we start adding Cells. When the block is - * done, we'll then go back and fill in the header and the checksum tail. Be aware that what - * gets serialized into the blockcache is a byte array that contains an HFileBlock followed by - * its checksums and then the header of the next HFileBlock (needed to help navigate), followed - * again by an extra 13 bytes of meta info needed when time to recreate the HFileBlock from cache. - * + * * ++++++++++++++ * + HFileBlock + * ++++++++++++++ - * + Checksums + + * + Checksums + <= Optional * ++++++++++++++ - * + NextHeader + + * + Metadata! + * ++++++++++++++ - * + ExtraMeta! + - * ++++++++++++++ - * - * TODO: Fix it so we do NOT put the NextHeader into blockcache. It is not necessary. + * + * @see #serialize(ByteBuffer) */ - static final CacheableDeserializer blockDeserializer = + static final CacheableDeserializer BLOCK_DESERIALIZER = new CacheableDeserializer() { public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType) throws IOException { - // Rewind to just before the EXTRA_SERIALIZATION_SPACE. - buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); - // Get a new buffer to pass the deserialized HFileBlock for it to 'own'. - ByteBuff newByteBuffer; + // The buf has the file block followed by block metadata. + // Set limit to just before the BLOCK_METADATA_SPACE then rewind. + buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind(); + // Get a new buffer to pass the HFileBlock for it to 'own'. + ByteBuff newByteBuff; if (reuse) { - newByteBuffer = buf.slice(); + newByteBuff = buf.slice(); } else { int len = buf.limit(); - newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len)); - newByteBuffer.put(0, buf, buf.position(), len); + newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len)); + newByteBuff.put(0, buf, buf.position(), len); } - // Read out the EXTRA_SERIALIZATION_SPACE content and shove into our HFileBlock. + // Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock. buf.position(buf.limit()); - buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); + buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE); boolean usesChecksum = buf.get() == (byte)1; - HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType); - hFileBlock.offset = buf.getLong(); - hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt(); - if (hFileBlock.hasNextBlockHeader()) { - hFileBlock.buf.limit(hFileBlock.buf.limit() - hFileBlock.headerSize()); - } + long offset = buf.getLong(); + int nextBlockOnDiskSize = buf.getInt(); + HFileBlock hFileBlock = + new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null); return hFileBlock; } @Override public int getDeserialiserIdentifier() { - return deserializerIdentifier; + return DESERIALIZER_IDENTIFIER; } @Override @@ -195,65 +270,36 @@ public class HFileBlock implements Cacheable { } }; - private static final int deserializerIdentifier; + private static final int DESERIALIZER_IDENTIFIER; static { - deserializerIdentifier = CacheableDeserializerIdManager - .registerDeserializer(blockDeserializer); + DESERIALIZER_IDENTIFIER = + CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER); } - /** Type of block. Header field 0. */ - private BlockType blockType; - /** - * Size on disk excluding header, including checksum. Header field 1. - * @see Writer#putHeader(byte[], int, int, int, int) + * Copy constructor. Creates a shallow copy of {@code that}'s buffer. */ - private int onDiskSizeWithoutHeader; - - /** - * Size of pure data. Does not include header or checksums. Header field 2. - * @see Writer#putHeader(byte[], int, int, int, int) - */ - private final int uncompressedSizeWithoutHeader; - - /** - * The offset of the previous block on disk. Header field 3. - * @see Writer#putHeader(byte[], int, int, int, int) - */ - private final long prevBlockOffset; - - /** - * Size on disk of header + data. Excludes checksum. Header field 6, - * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. - * @see Writer#putHeader(byte[], int, int, int, int) - */ - private final int onDiskDataSizeWithHeader; - - /** The in-memory representation of the hfile block */ - private ByteBuff buf; - - /** Meta data that holds meta information on the hfileblock */ - private HFileContext fileContext; - - /** - * The offset of this block in the file. Populated by the reader for - * convenience of access. This offset is not part of the block header. - */ - private long offset = UNSET; - - /** - * The on-disk size of the next block, including the header, obtained by - * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's - * header, or -1 if unknown. - */ - private int nextBlockOnDiskSizeWithHeader = UNSET; - - private MemoryType memType = MemoryType.EXCLUSIVE; + private HFileBlock(HFileBlock that) { + this.blockType = that.blockType; + this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader; + this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader; + this.prevBlockOffset = that.prevBlockOffset; + this.buf = that.buf.duplicate(); + this.offset = that.offset; + this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader; + this.fileContext = that.fileContext; + this.nextBlockOnDiskSize = that.nextBlockOnDiskSize; + } /** * Creates a new {@link HFile} block from the given fields. This constructor * is used when the block data has already been read and uncompressed, - * and is sitting in a byte buffer. + * and is sitting in a byte buffer and we want to stuff the block into cache. + * See {@link Writer#getBlockForCaching(CacheConfig)}. + * + *

TODO: The caller presumes no checksumming + * required of this block instance since going into cache; checksum already verified on + * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? * * @param blockType the type of this block, see {@link BlockType} * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} @@ -267,86 +313,94 @@ public class HFileBlock implements Cacheable { * @param fileContext HFile meta data */ HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - long prevBlockOffset, ByteBuff buf, boolean fillHeader, long offset, - int onDiskDataSizeWithHeader, HFileContext fileContext) { - this.blockType = blockType; - this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; - this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; - this.prevBlockOffset = prevBlockOffset; - this.buf = buf; - this.offset = offset; - this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; - this.fileContext = fileContext; + long prevBlockOffset, ByteBuffer b, boolean fillHeader, long offset, + final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext) { + init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, + prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); + this.buf = new SingleByteBuff(b); if (fillHeader) { overwriteHeader(); } this.buf.rewind(); } - HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, - int onDiskDataSizeWithHeader, HFileContext fileContext) { - this(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, - new SingleByteBuff(buf), fillHeader, offset, onDiskDataSizeWithHeader, fileContext); - } - - /** - * Copy constructor. Creates a shallow copy of {@code that}'s buffer. - */ - HFileBlock(HFileBlock that) { - this.blockType = that.blockType; - this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader; - this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader; - this.prevBlockOffset = that.prevBlockOffset; - this.buf = that.buf.duplicate(); - this.offset = that.offset; - this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader; - this.fileContext = that.fileContext; - this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader; - } - - HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException { - this(new SingleByteBuff(b), usesHBaseChecksum); - } - /** * Creates a block from an existing buffer starting with a header. Rewinds * and takes ownership of the buffer. By definition of rewind, ignores the * buffer position, but if you slice the buffer beforehand, it will rewind * to that point. + * @param buf Has header, content, and trailing checksums if present. */ - HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException { - this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE); - } - - /** - * Creates a block from an existing buffer starting with a header. Rewinds - * and takes ownership of the buffer. By definition of rewind, ignores the - * buffer position, but if you slice the buffer beforehand, it will rewind - * to that point. - */ - HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException { - b.rewind(); - blockType = BlockType.read(b); - onDiskSizeWithoutHeader = b.getInt(); - uncompressedSizeWithoutHeader = b.getInt(); - prevBlockOffset = b.getLong(); - HFileContextBuilder contextBuilder = new HFileContextBuilder(); - contextBuilder.withHBaseCheckSum(usesHBaseChecksum); - if (usesHBaseChecksum) { - contextBuilder.withChecksumType(ChecksumType.codeToType(b.get())); - contextBuilder.withBytesPerCheckSum(b.getInt()); - this.onDiskDataSizeWithHeader = b.getInt(); - } else { - contextBuilder.withChecksumType(ChecksumType.NULL); - contextBuilder.withBytesPerCheckSum(0); - this.onDiskDataSizeWithHeader = - onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; - } - this.fileContext = contextBuilder.build(); - this.memType = memType; - buf = b; + HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset, + final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException { buf.rewind(); + final BlockType blockType = BlockType.read(buf); + final int onDiskSizeWithoutHeader = buf.getInt(); + final int uncompressedSizeWithoutHeader = buf.getInt(); + final long prevBlockOffset = buf.getLong(); + byte checksumType = buf.get(); + int bytesPerChecksum = buf.getInt(); + int onDiskDataSizeWithHeader = buf.getInt(); + // This constructor is called when we deserialize a block from cache and when we read a block in + // from the fs. fileCache is null when deserialized from cache so need to make up one. + HFileContextBuilder fileContextBuilder = fileContext != null? + new HFileContextBuilder(fileContext): new HFileContextBuilder(); + fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum); + if (usesHBaseChecksum) { + // Use the checksum type and bytes per checksum from header, not from filecontext. + fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType)); + fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum); + } else { + fileContextBuilder.withChecksumType(ChecksumType.NULL); + fileContextBuilder.withBytesPerCheckSum(0); + // Need to fix onDiskDataSizeWithHeader; there are not checksums after-block-data + onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + headerSize(usesHBaseChecksum); + } + fileContext = fileContextBuilder.build(); + assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); + init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, + prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); + this.memType = memType; + this.offset = offset; + this.buf = buf; + this.buf.rewind(); + } + + /** + * Called from constructors. + */ + private void init(BlockType blockType, int onDiskSizeWithoutHeader, + int uncompressedSizeWithoutHeader, long prevBlockOffset, + long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, + HFileContext fileContext) { + this.blockType = blockType; + this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; + this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; + this.prevBlockOffset = prevBlockOffset; + this.offset = offset; + this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; + this.nextBlockOnDiskSize = nextBlockOnDiskSize; + this.fileContext = fileContext; + } + + /** + * Parse total ondisk size including header and checksum. Its second field in header after + * the magic bytes. + * @param headerBuf Header ByteBuffer. Presumed exact size of header. + * @return Size of the block with header included. + */ + private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf) { + // Set hbase checksum to true always calling headerSize. + return headerBuf.getInt(BlockType.MAGIC_LENGTH) + headerSize(true); + } + + /** + * @return the on-disk size of the next block (including the header size and any checksums if + * present) read by peeking into the next block's header; use as a hint when doing + * a read of the next block when scanning or running over a file. + */ + public int getNextBlockOnDiskSize() { + return nextBlockOnDiskSize; } public BlockType getBlockType() { @@ -414,49 +468,26 @@ public class HFileBlock implements Cacheable { * @return the buffer with header skipped and checksum omitted. */ public ByteBuff getBufferWithoutHeader() { - ByteBuff dup = this.buf.duplicate(); - dup.position(headerSize()); - dup.limit(buf.limit() - totalChecksumBytes()); - return dup.slice(); + ByteBuff dup = getBufferReadOnly(); + // Now set it up so Buffer spans content only -- no header or no checksums. + return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice(); } /** - * Returns the buffer this block stores internally. The clients must not - * modify the buffer object. This method has to be public because it is used + * Returns a read-only duplicate of the buffer this block stores internally ready to be read. + * Clients must not modify the buffer object though they may set position and limit on the + * returned buffer since we pass back a duplicate. This method has to be public because it is used * in {@link CompoundBloomFilter} to avoid object creation on every Bloom - * filter lookup, but has to be used with caution. Checksum data is not - * included in the returned buffer but header data is. + * filter lookup, but has to be used with caution. Buffer holds header, block content, + * and any follow-on checksums if present. * * @return the buffer of this block for read-only operations */ - ByteBuff getBufferReadOnly() { + public ByteBuff getBufferReadOnly() { + // TODO: ByteBuf does not support asReadOnlyBuffer(). Fix. ByteBuff dup = this.buf.duplicate(); - dup.limit(buf.limit() - totalChecksumBytes()); - return dup.slice(); - } - - /** - * Returns the buffer of this block, including header data. The clients must - * not modify the buffer object. This method has to be public because it is - * used in {@link org.apache.hadoop.hbase.io.hfile.bucket.BucketCache} to avoid buffer copy. - * - * @return the buffer with header and checksum included for read-only operations - */ - public ByteBuff getBufferReadOnlyWithHeader() { - ByteBuff dup = this.buf.duplicate(); - return dup.slice(); - } - - /** - * Returns a byte buffer of this block, including header data and checksum, positioned at - * the beginning of header. The underlying data array is not copied. - * - * @return the byte buffer with header and checksum included - */ - ByteBuff getBufferWithHeader() { - ByteBuff dupBuf = buf.duplicate(); - dupBuf.rewind(); - return dupBuf; + assert dup.position() == 0; + return dup; } private void sanityCheckAssertion(long valueFromBuf, long valueFromField, @@ -481,39 +512,38 @@ public class HFileBlock implements Cacheable { * valid header consistent with the fields. Assumes a packed block structure. * This function is primary for testing and debugging, and is not * thread-safe, because it alters the internal buffer pointer. + * Used by tests only. */ + @VisibleForTesting void sanityCheck() throws IOException { - buf.rewind(); + // Duplicate so no side-effects + ByteBuff dup = this.buf.duplicate().rewind(); + sanityCheckAssertion(BlockType.read(dup), blockType); - sanityCheckAssertion(BlockType.read(buf), blockType); + sanityCheckAssertion(dup.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); - sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, - "onDiskSizeWithoutHeader"); - - sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader, + sanityCheckAssertion(dup.getInt(), uncompressedSizeWithoutHeader, "uncompressedSizeWithoutHeader"); - sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset"); + sanityCheckAssertion(dup.getLong(), prevBlockOffset, "prevBlockOffset"); if (this.fileContext.isUseHBaseChecksum()) { - sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); - sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), + sanityCheckAssertion(dup.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); + sanityCheckAssertion(dup.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); - sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); + sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } int cksumBytes = totalChecksumBytes(); int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; - if (buf.limit() != expectedBufLimit) { - throw new AssertionError("Expected buffer limit " + expectedBufLimit - + ", got " + buf.limit()); + if (dup.limit() != expectedBufLimit) { + throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // block's header, so there are two sensible values for buffer capacity. int hdrSize = headerSize(); - if (buf.capacity() != expectedBufLimit && - buf.capacity() != expectedBufLimit + hdrSize) { - throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + + if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) { + throw new AssertionError("Invalid buffer capacity: " + dup.capacity() + ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); } } @@ -559,30 +589,6 @@ public class HFileBlock implements Cacheable { return sb.toString(); } - /** - * Called after reading a block with provided onDiskSizeWithHeader. - */ - private void validateOnDiskSizeWithoutHeader(int expectedOnDiskSizeWithoutHeader) - throws IOException { - if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) { - String dataBegin = null; - if (buf.hasArray()) { - dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit())); - } else { - ByteBuff bufDup = getBufferReadOnly(); - byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())]; - bufDup.get(dataBeginBytes); - dataBegin = Bytes.toStringBinary(dataBeginBytes); - } - String blockInfoMsg = - "Block offset: " + offset + ", data starts with: " + dataBegin; - throw new IOException("On-disk size without header provided is " - + expectedOnDiskSizeWithoutHeader + ", but block " - + "header contains " + onDiskSizeWithoutHeader + ". " + - blockInfoMsg); - } - } - /** * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its * encoded structure. Internal structures are shared between instances where applicable. @@ -607,32 +613,9 @@ public class HFileBlock implements Cacheable { ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); - - // Preserve the next block's header bytes in the new block if we have them. - if (unpacked.hasNextBlockHeader()) { - // Both the buffers are limited till checksum bytes and avoid the next block's header. - // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when - // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create - // new BB objects - ByteBuff inDup = this.buf.duplicate(); - inDup.limit(inDup.limit() + headerSize()); - ByteBuff outDup = unpacked.buf.duplicate(); - outDup.limit(outDup.limit() + unpacked.headerSize()); - outDup.put( - unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader - + unpacked.totalChecksumBytes(), inDup, this.onDiskDataSizeWithHeader, - unpacked.headerSize()); - } return unpacked; } - /** - * Return true when this buffer includes next block's header. - */ - private boolean hasNextBlockHeader() { - return nextBlockOnDiskSizeWithHeader > 0; - } - /** * Always allocates a new buffer of the correct size. Copies header bytes * from the existing buffer. Does not change header fields. @@ -641,8 +624,7 @@ public class HFileBlock implements Cacheable { private void allocateBuffer() { int cksumBytes = totalChecksumBytes(); int headerSize = headerSize(); - int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + - cksumBytes + (hasNextBlockHeader() ? headerSize : 0); + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; // TODO we need consider allocating offheap here? ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); @@ -670,9 +652,8 @@ public class HFileBlock implements Cacheable { } /** An additional sanity-check in case no compression or encryption is being used. */ - public void assumeUncompressed() throws IOException { - if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + - totalChecksumBytes()) { + public void sanityCheckUncompressedSize() throws IOException { + if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader @@ -680,11 +661,14 @@ public class HFileBlock implements Cacheable { } } - /** @return the offset of this block in the file it was read from */ + /** + * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link CacheKey} when + * block is returned to the cache. + * @return the offset of this block in the file it was read from + */ long getOffset() { if (offset < 0) { - throw new IllegalStateException( - "HFile block offset not initialized properly"); + throw new IllegalStateException("HFile block offset not initialized properly"); } return offset; } @@ -744,7 +728,6 @@ public class HFileBlock implements Cacheable { // We could not read the "extra data", but that is OK. break; } - if (ret < 0) { throw new IOException("Premature EOF from inputStream (read " + "returned " + ret + ", was trying to read " + necessaryLen @@ -798,14 +781,6 @@ public class HFileBlock implements Cacheable { return bytesRead != necessaryLen && bytesRemaining <= 0; } - /** - * @return the on-disk size of the next block (including the header size) - * that was read by peeking into the next block's header - */ - public int getNextBlockOnDiskSizeWithHeader() { - return nextBlockOnDiskSizeWithHeader; - } - /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: @@ -838,8 +813,8 @@ public class HFileBlock implements Cacheable { private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; /** - * The stream we use to accumulate data in uncompressed format for each - * block. We reset this stream at the end of each block and reuse it. The + * The stream we use to accumulate data into a block in an uncompressed format. + * We reset this stream at the end of each block and reuse it. The * header is written as the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes into this * stream. */ @@ -867,7 +842,7 @@ public class HFileBlock implements Cacheable { * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ - private byte[] onDiskBytesWithHeader; + private byte[] onDiskBlockBytesWithHeader; /** * The size of the checksum data on disk. It is used only if data is @@ -884,7 +859,7 @@ public class HFileBlock implements Cacheable { * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. * Does not store checksums. */ - private byte[] uncompressedBytesWithHeader; + private byte[] uncompressedBlockBytesWithHeader; /** * Current block's start offset in the {@link HFile}. Set in @@ -992,18 +967,19 @@ public class HFileBlock implements Cacheable { Preconditions.checkState(state != State.INIT, "Unexpected state: " + state); - if (state == State.BLOCK_READY) + if (state == State.BLOCK_READY) { return; + } // This will set state to BLOCK_READY. finishBlock(); } /** - * An internal method that flushes the compressing stream (if using - * compression), serializes the header, and takes care of the separate - * uncompressed stream for caching on write, if applicable. Sets block - * write state to "block ready". + * Finish up writing of the block. + * Flushes the compressing stream (if using compression), fills out the header, + * does any compression/encryption of bytes to flush out to disk, and manages + * the cache on write content, if applicable. Sets block write state to "block ready". */ private void finishBlock() throws IOException { if (blockType == BlockType.DATA) { @@ -1012,41 +988,40 @@ public class HFileBlock implements Cacheable { blockType = dataBlockEncodingCtx.getBlockType(); } userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array. + // This does an array copy, so it is safe to cache this byte array when cache-on-write. // Header is still the empty, 'dummy' header that is yet to be filled out. - uncompressedBytesWithHeader = baosInMemory.toByteArray(); + uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); prevOffset = prevOffsetByType[blockType.getId()]; - // We need to set state before we can package the block up for - // cache-on-write. In a way, the block is ready, but not yet encoded or - // compressed. + // We need to set state before we can package the block up for cache-on-write. In a way, the + // block is ready, but not yet encoded or compressed. state = State.BLOCK_READY; if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { - onDiskBytesWithHeader = dataBlockEncodingCtx - .compressAndEncrypt(uncompressedBytesWithHeader); + onDiskBlockBytesWithHeader = dataBlockEncodingCtx. + compressAndEncrypt(uncompressedBlockBytesWithHeader); } else { - onDiskBytesWithHeader = this.defaultBlockEncodingCtx. - compressAndEncrypt(uncompressedBytesWithHeader); + onDiskBlockBytesWithHeader = defaultBlockEncodingCtx. + compressAndEncrypt(uncompressedBlockBytesWithHeader); } // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = (int) ChecksumUtil.numBytes( - onDiskBytesWithHeader.length, + onDiskBlockBytesWithHeader.length, fileContext.getBytesPerChecksum()); // Put the header for the on disk bytes; header currently is unfilled-out - putHeader(onDiskBytesWithHeader, 0, - onDiskBytesWithHeader.length + numBytes, - uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); + putHeader(onDiskBlockBytesWithHeader, 0, + onDiskBlockBytesWithHeader.length + numBytes, + uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from - // onDiskBytesWithHeader array. - if (onDiskBytesWithHeader != uncompressedBytesWithHeader) { - putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length + numBytes, - uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); + // onDiskBlockBytesWithHeader array. + if (onDiskBlockBytesWithHeader != uncompressedBlockBytesWithHeader) { + putHeader(uncompressedBlockBytesWithHeader, 0, + onDiskBlockBytesWithHeader.length + numBytes, + uncompressedBlockBytesWithHeader.length, onDiskBlockBytesWithHeader.length); } onDiskChecksum = new byte[numBytes]; ChecksumUtil.generateChecksums( - onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, + onDiskBlockBytesWithHeader, 0, onDiskBlockBytesWithHeader.length, onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum()); } @@ -1101,7 +1076,7 @@ public class HFileBlock implements Cacheable { protected void finishBlockAndWriteHeaderAndData(DataOutputStream out) throws IOException { ensureBlockReady(); - out.write(onDiskBytesWithHeader); + out.write(onDiskBlockBytesWithHeader); out.write(onDiskChecksum); } @@ -1120,12 +1095,12 @@ public class HFileBlock implements Cacheable { // This is not very optimal, because we are doing an extra copy. // But this method is used only by unit tests. byte[] output = - new byte[onDiskBytesWithHeader.length + new byte[onDiskBlockBytesWithHeader.length + onDiskChecksum.length]; - System.arraycopy(onDiskBytesWithHeader, 0, output, 0, - onDiskBytesWithHeader.length); + System.arraycopy(onDiskBlockBytesWithHeader, 0, output, 0, + onDiskBlockBytesWithHeader.length); System.arraycopy(onDiskChecksum, 0, output, - onDiskBytesWithHeader.length, onDiskChecksum.length); + onDiskBlockBytesWithHeader.length, onDiskChecksum.length); return output; } @@ -1153,7 +1128,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length + + return onDiskBlockBytesWithHeader.length + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; } @@ -1166,7 +1141,7 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length + onDiskChecksum.length; + return onDiskBlockBytesWithHeader.length + onDiskChecksum.length; } /** @@ -1174,7 +1149,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithoutHeader() { expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE; + return uncompressedBlockBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1182,7 +1157,7 @@ public class HFileBlock implements Cacheable { */ int getUncompressedSizeWithHeader() { expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length; + return uncompressedBlockBytesWithHeader.length; } /** @return true if a block is being written */ @@ -1212,7 +1187,7 @@ public class HFileBlock implements Cacheable { */ ByteBuffer getUncompressedBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(uncompressedBytesWithHeader); + return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); } /** @@ -1225,7 +1200,7 @@ public class HFileBlock implements Cacheable { */ ByteBuffer getOnDiskBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(onDiskBytesWithHeader); + return ByteBuffer.wrap(onDiskBlockBytesWithHeader); } private void expectState(State expectedState) { @@ -1257,6 +1232,10 @@ public class HFileBlock implements Cacheable { * block does not have checksum data even though the header minor * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * 0 value in bytesPerChecksum. + * + *

TODO: Should there be an option where a cache can ask that hbase preserve block + * checksums for checking after a block comes out of the cache? Otehrwise, cache is responsible + * for blocks being wholesome (ECC memory or if file-backed, it does checksumming). */ HFileBlock getBlockForCaching(CacheConfig cacheConf) { HFileContext newContext = new HFileContextBuilder() @@ -1270,13 +1249,13 @@ public class HFileBlock implements Cacheable { .withIncludesMvcc(fileContext.isIncludesMvcc()) .withIncludesTags(fileContext.isIncludesTags()) .build(); - return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), + return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), getUncompressedSizeWithoutHeader(), prevOffset, - cacheConf.shouldCacheCompressed(blockType.getCategory()) ? + cacheConf.shouldCacheCompressed(blockType.getCategory())? getOnDiskBufferWithHeader() : getUncompressedBufferWithHeader(), - FILL_HEADER, startOffset, - onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); + FILL_HEADER, startOffset, UNSET, + onDiskBlockBytesWithHeader.length + onDiskChecksum.length, newContext); } } @@ -1322,12 +1301,9 @@ public class HFileBlock implements Cacheable { * @param offset * @param onDiskSize the on-disk size of the entire block, including all * applicable headers, or -1 if unknown - * @param uncompressedSize the uncompressed size of the compressed part of - * the block, or -1 if unknown * @return the newly read block */ - HFileBlock readBlockData(long offset, long onDiskSize, - int uncompressedSize, boolean pread) throws IOException; + HFileBlock readBlockData(long offset, long onDiskSize, boolean pread) throws IOException; /** * Creates a block iterator over the given portion of the {@link HFile}. @@ -1380,6 +1356,11 @@ public class HFileBlock implements Cacheable { /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ private final HFileBlockDefaultDecodingContext defaultDecodingCtx; + /** + * When we read a block, we overread and pull in the next blocks header too. We will save it + * here. If moving serially through the file, we will trip over this caching of the next blocks + * header so we won't have to do explicit seek to find next blocks lengths, etc. + */ private ThreadLocal prefetchedHeaderForThread = new ThreadLocal() { @Override @@ -1443,7 +1424,7 @@ public class HFileBlock implements Cacheable { public HFileBlock nextBlock() throws IOException { if (offset >= endOffset) return null; - HFileBlock b = readBlockData(offset, -1, -1, false); + HFileBlock b = readBlockData(offset, -1, false); offset += b.getOnDiskSizeWithHeader(); return b.unpack(fileContext, owner); } @@ -1463,7 +1444,7 @@ public class HFileBlock implements Cacheable { /** * Does a positional read or a seek and read into the given buffer. Returns - * the on-disk size of the next block, or -1 if it could not be determined. + * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF. * * @param dest destination buffer * @param destOffset offset into the destination buffer at where to put the bytes we read @@ -1473,7 +1454,8 @@ public class HFileBlock implements Cacheable { * @param pread whether we should do a positional read * @param istream The input source of data * @return the on-disk size of the next block with header size included, or - * -1 if it could not be determined + * -1 if it could not be determined; if not -1, the dest INCLUDES the + * next header * @throws IOException */ protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, @@ -1505,16 +1487,16 @@ public class HFileBlock implements Cacheable { } // Try to read the next block header. - if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) + if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { return -1; + } } finally { streamLock.unlock(); } } else { // Positional read. Better for random reads; or when the streamLock is already locked. int extraSize = peekIntoNextBlock ? hdrSize : 0; - if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, - size, extraSize)) { + if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) { return -1; } } @@ -1530,16 +1512,12 @@ public class HFileBlock implements Cacheable { * @param offset the offset in the stream to read at * @param onDiskSizeWithHeaderL the on-disk size of the block, including * the header, or -1 if unknown - * @param uncompressedSize the uncompressed size of the the block. Always - * expected to be -1. This parameter is only used in version 1. * @param pread whether to use a positional read */ @Override - public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, - int uncompressedSize, boolean pread) + public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread) throws IOException { - - // get a copy of the current state of whether to validate + // Get a copy of the current state of whether to validate // hbase checksums or not for this read call. This is not // thread-safe but the one constaint is that if we decide // to skip hbase checksum verification then we are @@ -1548,8 +1526,7 @@ public class HFileBlock implements Cacheable { FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); HFileBlock blk = readBlockDataInternal(is, offset, - onDiskSizeWithHeaderL, - uncompressedSize, pread, + onDiskSizeWithHeaderL, pread, doVerificationThruHBaseChecksum); if (blk == null) { HFile.LOG.warn("HBase checksum verification failed for file " + @@ -1576,8 +1553,7 @@ public class HFileBlock implements Cacheable { // a few more than precisely this number. is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); doVerificationThruHBaseChecksum = false; - blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, - uncompressedSize, pread, + blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, doVerificationThruHBaseChecksum); if (blk != null) { HFile.LOG.warn("HDFS checksum verification suceeded for file " + @@ -1604,176 +1580,140 @@ public class HFileBlock implements Cacheable { return blk; } + /** + * @return Check onDiskSizeWithHeaderL size is healthy and then return it as an int + * @throws IOException + */ + private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize) + throws IOException { + if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) + || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) { + throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL + + ": expected to be at least " + hdrSize + + " and at most " + Integer.MAX_VALUE + ", or -1"); + } + return (int)onDiskSizeWithHeaderL; + } + + /** + * Check threadlocal cache for this block's header; we usually read it on the tail of reading + * the previous block to save a seek. Otherwise, we have to do a seek to read the header before + * we can pull in the block. + * @return The cached block header or null if not found. + * @see #cacheNextBlockHeader(long, byte[], int, int) + */ + private ByteBuffer getCachedHeader(final long offset) { + PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); + // PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); + return prefetchedHeader != null && prefetchedHeader.offset == offset? + prefetchedHeader.buf: null; + } + + /** + * Save away the next blocks header in thread local. + * @see #getCachedHeader(long) + */ + private void cacheNextBlockHeader(final long nextBlockOffset, + final byte [] header, final int headerOffset, final int headerLength) { + PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); + prefetchedHeader.offset = nextBlockOffset; + System.arraycopy(header, headerOffset, prefetchedHeader.header, 0, headerLength); + } + + /** + * Verify the passed in onDiskSizeWithHeader aligns with what is in the header else something + * is not right. + * @throws IOException + */ + private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf, + final long offset) + throws IOException { + // Assert size provided aligns with what is in the header + int fromHeader = getOnDiskSizeWithHeader(headerBuf); + if (passedIn != fromHeader) { + throw new IOException("Passed in onDiskSizeWithHeader=" + passedIn + " != " + fromHeader + + ", offset=" + offset + ", fileContext=" + this.fileContext); + } + } + /** * Reads a version 2 block. * * @param offset the offset in the stream to read at * @param onDiskSizeWithHeaderL the on-disk size of the block, including - * the header, or -1 if unknown - * @param uncompressedSize the uncompressed size of the the block. Always - * expected to be -1. This parameter is only used in version 1. + * the header and checksums if present or -1 if unknown * @param pread whether to use a positional read * @param verifyChecksum Whether to use HBase checksums. * If HBase checksum is switched off, then use HDFS checksum. * @return the HFileBlock or null if there is a HBase checksum mismatch */ private HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, - long onDiskSizeWithHeaderL, int uncompressedSize, boolean pread, - boolean verifyChecksum) + long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException { if (offset < 0) { throw new IOException("Invalid offset=" + offset + " trying to read " - + "block (onDiskSize=" + onDiskSizeWithHeaderL - + ", uncompressedSize=" + uncompressedSize + ")"); + + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); } - - if (uncompressedSize != -1) { - throw new IOException("Version 2 block reader API does not need " + - "the uncompressed size parameter"); + int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); + ByteBuffer headerBuf = getCachedHeader(offset); + if (LOG.isTraceEnabled()) { + LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset + + ", pread=" + pread + ", verifyChecksum=" + verifyChecksum + ", cachedHeader=" + + headerBuf + ", onDiskSizeWithHeader=" + onDiskSizeWithHeader); } - - if ((onDiskSizeWithHeaderL < hdrSize && onDiskSizeWithHeaderL != -1) - || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) { - throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL - + ": expected to be at least " + hdrSize - + " and at most " + Integer.MAX_VALUE + ", or -1 (offset=" - + offset + ", uncompressedSize=" + uncompressedSize + ")"); - } - - int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; - - // See if we can avoid reading the header. This is desirable, because - // we will not incur a backward seek operation if we have already - // read this block's header as part of the previous read's look-ahead. - // And we also want to skip reading the header again if it has already - // been read. - // TODO: How often does this optimization fire? Has to be same thread so the thread local - // is pertinent and we have to be reading next block as in a big scan. - ByteBuffer headerBuf = null; - PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - boolean preReadHeader = false; - if (prefetchedHeader != null && prefetchedHeader.offset == offset) { - headerBuf = prefetchedHeader.buf; - preReadHeader = true; - } - // Allocate enough space to fit the next block's header too. - int nextBlockOnDiskSize = 0; - byte[] onDiskBlock = null; - - HFileBlock b = null; - boolean fastPath = false; - boolean readHdrOnly = false; - if (onDiskSizeWithHeader > 0) { - fastPath = true; - // We know the total on-disk size. Read the entire block into memory, - // then parse the header. This code path is used when - // doing a random read operation relying on the block index, as well as - // when the client knows the on-disk size from peeking into the next - // block's header (e.g. this block's header) when reading the previous - // block. This is the faster and more preferable case. - - // Size that we have to skip in case we have already read the header. - int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; - onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the - // next block's header - nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, - preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, - true, offset + preReadHeaderSize, pread); - if (headerBuf != null) { - // the header has been read when reading the previous block, copy - // to this block's header - // headerBuf is HBB - assert headerBuf.hasArray(); - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); - } else { - headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); - } - // We know the total on-disk size but not the uncompressed size. Parse the header. - try { - // TODO: FIX!!! Expensive parse just to get a length - b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); - } catch (IOException ex) { - // Seen in load testing. Provide comprehensive debug info. - throw new IOException("Failed to read compressed block at " - + offset - + ", onDiskSizeWithoutHeader=" - + onDiskSizeWithHeader - + ", preReadHeaderSize=" - + hdrSize - + ", header.length=" - + prefetchedHeader.header.length - + ", header bytes: " - + Bytes.toStringBinary(prefetchedHeader.header, 0, - hdrSize), ex); - } - // if the caller specifies a onDiskSizeWithHeader, validate it. - int onDiskSizeWithoutHeader = onDiskSizeWithHeader - hdrSize; - assert onDiskSizeWithoutHeader >= 0; - b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader); - } else { - // Check headerBuf to see if we have read this block's header as part of - // reading the previous block. This is an optimization of peeking into - // the next block's header (e.g.this block's header) when reading the - // previous block. This is the faster and more preferable case. If the - // header is already there, don't read the header again. - - // Unfortunately, we still have to do a separate read operation to - // read the header. + if (onDiskSizeWithHeader <= 0) { + // We were not passed the block size. Need to get it from the header. If header was not in + // cache, need to seek to pull it in. This latter might happen when we are doing the first + // read in a series of reads or a random read, and we don't have access to the block index. + // This is costly and should happen very rarely. if (headerBuf == null) { - readHdrOnly = true; - // From the header, determine the on-disk size of the given hfile - // block, and read the remaining data, thereby incurring two read - // operations. This might happen when we are doing the first read - // in a series of reads or a random read, and we don't have access - // to the block index. This is costly and should happen very rarely. headerBuf = ByteBuffer.allocate(hdrSize); - // headerBuf is HBB - readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), - hdrSize, false, offset, pread); + readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, + offset, pread); } - // TODO: FIX!!! Expensive parse just to get a length - b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); - // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header - onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - // headerBuf is HBB. Copy hdr into onDiskBlock + onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf); + } + int preReadHeaderSize = headerBuf == null? 0 : hdrSize; + // Allocate enough space to fit the next block's header too; saves a seek next time through. + // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header; + // onDiskSizeWithHeader is header, body, and any checksums if present. + // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). + byte[] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; + int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, + onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); + if (headerBuf != null) { + // The header has been read when reading the previous block OR in a distinct header-only + // read. Copy to this block's header. System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); - nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, - b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread); - onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; + } else { + headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); } - - if (!fileContext.isCompressedOrEncrypted()) { - b.assumeUncompressed(); - } - - if (verifyChecksum && !validateBlockChecksum(b, offset, onDiskBlock, hdrSize)) { - return null; // checksum mismatch - } - + // Do a few checks before we go instantiate HFileBlock. + assert onDiskSizeWithHeader > this.hdrSize; + verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset); // The onDiskBlock will become the headerAndDataBuffer for this block. // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already - // contains the header of next block, so no need to set next - // block's header in it. - b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader), - this.fileContext.isUseHBaseChecksum()); - - b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; - - // Set prefetched header - if (b.hasNextBlockHeader()) { - prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize); + // contains the header of next block, so no need to set next block's header in it. + HFileBlock hFileBlock = + new HFileBlock(new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader)), + this.fileContext.isUseHBaseChecksum(), MemoryType.EXCLUSIVE, offset, + nextBlockOnDiskSize, fileContext); + // Run check on uncompressed sizings. + if (!fileContext.isCompressedOrEncrypted()) { + hFileBlock.sanityCheckUncompressed(); + } + if (verifyChecksum && !validateBlockChecksum(hFileBlock, offset, onDiskBlock, hdrSize)) { + return null; } - - b.offset = offset; - b.fileContext.setIncludesTags(this.fileContext.isIncludesTags()); - b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc()); if (LOG.isTraceEnabled()) { - LOG.trace("Read preReadHeader=" + preReadHeader + ", fastPath=" + fastPath + - ", readHdrOnly=" + readHdrOnly + ", " + b); + LOG.trace("Read " + hFileBlock); } - return b; + // Cache next block header if we read it for the next time through here. + if (nextBlockOnDiskSize != -1) { + cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), + onDiskBlock, onDiskSizeWithHeader, hdrSize); + } + return hFileBlock; } @Override @@ -1819,42 +1759,73 @@ public class HFileBlock implements Cacheable { } } + /** An additional sanity-check in case no compression or encryption is being used. */ + void sanityCheckUncompressed() throws IOException { + if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + + totalChecksumBytes()) { + throw new IOException("Using no compression but " + + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " + + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader + + ", numChecksumbytes=" + totalChecksumBytes()); + } + } + + // Cacheable implementation @Override public int getSerializedLength() { if (buf != null) { - // include extra bytes for the next header when it's available. - int extraSpace = hasNextBlockHeader() ? headerSize() : 0; - return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE; + // Include extra bytes for block metadata. + return this.buf.limit() + BLOCK_METADATA_SPACE; } return 0; } + // Cacheable implementation @Override public void serialize(ByteBuffer destination) { - this.buf.get(destination, 0, getSerializedLength() - EXTRA_SERIALIZATION_SPACE); - serializeExtraInfo(destination); + // BE CAREFUL!! There is a custom version of this serialization over in BucketCache#doDrain. + // Make sure any changes in here are reflected over there. + this.buf.get(destination, 0, getSerializedLength() - BLOCK_METADATA_SPACE); + destination = addMetaData(destination); + + // Make it ready for reading. flip sets position to zero and limit to current position which + // is what we want if we do not want to serialize the block plus checksums if present plus + // metadata. + destination.flip(); } /** - * Write out the content of EXTRA_SERIALIZATION_SPACE. Public so can be accessed by BucketCache. + * For use by bucketcache. This exposes internals. */ - public void serializeExtraInfo(ByteBuffer destination) { - destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); - destination.putLong(this.offset); - destination.putInt(this.nextBlockOnDiskSizeWithHeader); - destination.rewind(); + public ByteBuffer getMetaData() { + ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); + bb = addMetaData(bb); + bb.flip(); + return bb; } + /** + * Adds metadata at current position (position is moved forward). Does not flip or reset. + * @return The passed destination with metadata added. + */ + private ByteBuffer addMetaData(final ByteBuffer destination) { + destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); + destination.putLong(this.offset); + destination.putInt(this.nextBlockOnDiskSize); + return destination; + } + + // Cacheable implementation @Override public CacheableDeserializer getDeserializer() { - return HFileBlock.blockDeserializer; + return HFileBlock.BLOCK_DESERIALIZER; } @Override public int hashCode() { int result = 1; result = result * 31 + blockType.hashCode(); - result = result * 31 + nextBlockOnDiskSizeWithHeader; + result = result * 31 + nextBlockOnDiskSize; result = result * 31 + (int) (offset ^ (offset >>> 32)); result = result * 31 + onDiskSizeWithoutHeader; result = result * 31 + (int) (prevBlockOffset ^ (prevBlockOffset >>> 32)); @@ -1880,9 +1851,10 @@ public class HFileBlock implements Cacheable { if (castedComparison.blockType != this.blockType) { return false; } - if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) { + if (castedComparison.nextBlockOnDiskSize != this.nextBlockOnDiskSize) { return false; } + // Offset is important. Needed when we have to remake cachekey when block is returned to cache. if (castedComparison.offset != this.offset) { return false; } @@ -1968,7 +1940,7 @@ public class HFileBlock implements Cacheable { } /** - * @return the HFileContext used to create this HFileBlock. Not necessary the + * @return This HFileBlocks fileContext which will a derivative of the * fileContext for the file from which this block's data was originally read. */ HFileContext getHFileContext() { @@ -1992,6 +1964,7 @@ public class HFileBlock implements Cacheable { * This is mostly helpful for debugging. This assumes that the block * has minor version > 0. */ + @VisibleForTesting static String toStringHeader(ByteBuff buf) throws IOException { byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; buf.get(magicBuf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 9f29f9738df..506f08d0f6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -60,7 +60,7 @@ import org.apache.hadoop.util.StringUtils; * Examples of how to use the block index writer can be found in * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and * {@link HFileWriterImpl}. Examples of how to use the reader can be - * found in {@link HFileWriterImpl} and + * found in {@link HFileReaderImpl} and * {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}. */ @InterfaceAudience.Private diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 8f5040ef15a..d71911f340e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -252,18 +252,20 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { long end = 0; try { end = getTrailer().getLoadOnOpenDataOffset(); - HFileBlock prevBlock = null; if (LOG.isTraceEnabled()) { LOG.trace("File=" + path.toString() + ", offset=" + offset + ", end=" + end); } + // TODO: Could we use block iterator in here? Would that get stuff into the cache? + HFileBlock prevBlock = null; while (offset < end) { if (Thread.interrupted()) { break; } - long onDiskSize = -1; - if (prevBlock != null) { - onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); - } + // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then + // the internal-to-hfileblock thread local which holds the overread that gets the + // next header, will not have happened...so, pass in the onDiskSize gotten from the + // cached block. This 'optimization' triggers extremely rarely I'd say. + long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1; HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null, null); // Need not update the current block. Ideally here the readBlock won't find the @@ -903,9 +905,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // We are reading the next block without block type validation, because // it might turn out to be a non-data block. - block = reader.readBlock(block.getOffset() - + block.getOnDiskSizeWithHeader(), - block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, + block = reader.readBlock(block.getOffset() + block.getOnDiskSizeWithHeader(), + block.getNextBlockOnDiskSize(), cacheBlocks, pread, isCompaction, true, null, getEffectiveDataBlockEncoding()); if (block != null && !block.getBlockType().isData()) { // Findbugs: NP_NULL_ON_SOME_PATH // Whatever block we read we will be returning it unless @@ -1439,8 +1440,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // Cache Miss, please load. } - HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true).unpack(hfileContext, fsBlockReader); + HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, true). + unpack(hfileContext, fsBlockReader); // Cache the block if (cacheBlock) { @@ -1526,8 +1527,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } // Load block from filesystem. - HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, - pread); + HFileBlock hfileBlock = + fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread); validateBlockType(hfileBlock, expectedBlockType); HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); @@ -1871,6 +1872,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @return Scanner on this file. */ @Override + @VisibleForTesting public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) { return getScanner(cacheBlocks, pread, false); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index c67bdd4976b..e0f3d7429d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -99,18 +99,21 @@ public interface HFileScanner extends Shipper, Closeable { * @throws IOException */ boolean seekTo() throws IOException; + /** * Scans to the next entry in the file. * @return Returns false if you are at the end otherwise true if more in file. * @throws IOException */ boolean next() throws IOException; + /** * Gets the current key in the form of a cell. You must call * {@link #seekTo(Cell)} before this method. * @return gets the current key as a Cell. */ Cell getKey(); + /** * Gets a buffer view to the current value. You must call * {@link #seekTo(Cell)} before this method. @@ -119,26 +122,35 @@ public interface HFileScanner extends Shipper, Closeable { * the position is 0, the start of the buffer view. */ ByteBuffer getValue(); + /** * @return Instance of {@link org.apache.hadoop.hbase.Cell}. */ Cell getCell(); + /** * Convenience method to get a copy of the key as a string - interpreting the * bytes as UTF8. You must call {@link #seekTo(Cell)} before this method. * @return key as a string + * @deprecated Since hbase-2.0.0 */ + @Deprecated String getKeyString(); + /** * Convenience method to get a copy of the value as a string - interpreting * the bytes as UTF8. You must call {@link #seekTo(Cell)} before this method. * @return value as a string + * @deprecated Since hbase-2.0.0 */ + @Deprecated String getValueString(); + /** * @return Reader that underlies this Scanner instance. */ HFile.Reader getReader(); + /** * @return True is scanner has had one of the seek calls invoked; i.e. * {@link #seekBefore(Cell)} or {@link #seekTo()} or {@link #seekTo(Cell)}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 66aced00b64..69c42c34ad8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1317,25 +1317,22 @@ public class BucketCache implements BlockCache, HeapSize { final AtomicLong realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); - // This cacheable thing can't be serialized... + // This cacheable thing can't be serialized if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { if (data instanceof HFileBlock) { - HFileBlock block = (HFileBlock) data; - ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader(); - sliceBuf.rewind(); - assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE || - len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE; - ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); - block.serializeExtraInfo(extraInfoBuffer); + // If an instance of HFileBlock, save on some allocations. + HFileBlock block = (HFileBlock)data; + ByteBuff sliceBuf = block.getBufferReadOnly(); + ByteBuffer metadata = block.getMetaData(); if (LOG.isTraceEnabled()) { LOG.trace("Write offset=" + offset + ", len=" + len); } ioEngine.write(sliceBuf, offset); - ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); + ioEngine.write(metadata, offset + len - metadata.limit()); } else { ByteBuffer bb = ByteBuffer.allocate(len); data.serialize(bb); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index e26022e35bf..ed86a83bfab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -32,7 +33,7 @@ import org.apache.hadoop.hbase.client.Scan; // TODO: Change name from KeyValueScanner to CellScanner only we already have a simple CellScanner // so this should be something else altogether, a decoration on our base CellScanner. TODO. // This class shows in CPs so do it all in one swell swoop. HBase-2.0.0. -public interface KeyValueScanner extends Shipper { +public interface KeyValueScanner extends Shipper, Closeable { /** * The byte array represents for NO_NEXT_INDEXED_KEY; * The actual value is irrelevant because this is always compared by reference. @@ -74,6 +75,7 @@ public interface KeyValueScanner extends Shipper { * The default implementation for this would be to return 0. A file having * lower sequence id will be considered to be the older one. */ + // TODO: Implement SequenceId Interface instead. long getSequenceID(); /** @@ -137,11 +139,11 @@ public interface KeyValueScanner extends Shipper { * peek KeyValue of scanner has the same row with specified Cell, * otherwise seek the scanner at the first Cell of the row which is the * previous row of specified KeyValue - * + * * @param key seek KeyValue * @return true if the scanner is at the valid KeyValue, false if such * KeyValue does not exist - * + * */ public boolean backwardSeek(Cell key) throws IOException; @@ -156,7 +158,7 @@ public interface KeyValueScanner extends Shipper { /** * Seek the scanner at the first KeyValue of last row - * + * * @return true if scanner has values left, false if the underlying data is * empty * @throws IOException @@ -169,4 +171,4 @@ public interface KeyValueScanner extends Shipper { * see HFileWriterImpl#getMidpoint, or null if not known. */ public Cell getNextIndexedKey(); -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 61eb9b8d64a..b6164b202ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1271,7 +1271,7 @@ public class StoreFile { } /** - * Warning: Do not write further code which depends on this call. Instead + * @deprecated Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. * @@ -1285,7 +1285,7 @@ public class StoreFile { } /** - * Warning: Do not write further code which depends on this call. Instead + * @deprecated Do not write further code which depends on this call. Instead * use getStoreFileScanner() which uses the StoreFileScanner class/interface * which is the preferred way to scan a store with higher level concepts. * diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 69671e29db8..040685d544f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ChecksumType; public class CacheTestUtils { @@ -66,6 +65,7 @@ public class CacheTestUtils { /*Post eviction, heapsize should be the same */ assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); } + public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, final int numThreads, final int numQueries, final double passingScore) throws Exception { @@ -339,25 +339,16 @@ public class CacheTestUtils { } - private static HFileBlockPair[] generateHFileBlocks(int blockSize, - int numBlocks) { + private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; Random rand = new Random(); HashSet usedStrings = new HashSet(); for (int i = 0; i < numBlocks; i++) { - - // The buffer serialized size needs to match the size of BlockSize. So we - // declare our data size to be smaller than it by the serialization space - // required. - - SingleByteBuff cachedBuffer = new SingleByteBuff(ByteBuffer.allocate(blockSize - - HFileBlock.EXTRA_SERIALIZATION_SPACE)); + ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); rand.nextBytes(cachedBuffer.array()); cachedBuffer.rewind(); - int onDiskSizeWithoutHeader = blockSize - - HFileBlock.EXTRA_SERIALIZATION_SPACE; - int uncompressedSizeWithoutHeader = blockSize - - HFileBlock.EXTRA_SERIALIZATION_SPACE; + int onDiskSizeWithoutHeader = blockSize; + int uncompressedSizeWithoutHeader = blockSize; long prevBlockOffset = rand.nextLong(); BlockType.DATA.write(cachedBuffer); cachedBuffer.putInt(onDiskSizeWithoutHeader); @@ -376,7 +367,7 @@ public class CacheTestUtils { onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER, blockSize, - onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, meta); + onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta); String strKey; /* No conflicting keys */ @@ -395,4 +386,4 @@ public class CacheTestUtils { BlockCacheKey blockName; HFileBlock block; } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 5158e3525d6..a9d8258bf30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -259,7 +259,6 @@ public class TestCacheOnWrite { assertTrue(testDescription, scanner.seekTo()); long offset = 0; - HFileBlock prevBlock = null; EnumMap blockCountByType = new EnumMap(BlockType.class); @@ -267,14 +266,10 @@ public class TestCacheOnWrite { List cachedBlocksOffset = new ArrayList(); Map cachedBlocks = new HashMap(); while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { - long onDiskSize = -1; - if (prevBlock != null) { - onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); - } // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. - HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, - false, true, null, encodingInCache); + HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, + encodingInCache); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); @@ -307,7 +302,6 @@ public class TestCacheOnWrite { assertEquals( block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); } - prevBlock = block; offset += block.getOnDiskSizeWithHeader(); BlockType bt = block.getBlockType(); Integer count = blockCountByType.get(bt); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 91ab8c07d7b..d91a811c6d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -94,7 +94,7 @@ public class TestChecksum { meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( is, totalSize, (HFileSystem) fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, false); + HFileBlock b = hbr.readBlockData(0, -1, false); assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); } @@ -108,12 +108,14 @@ public class TestChecksum { ChecksumType cktype = itr.next(); Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName()); FSDataOutputStream os = fs.create(path); - HFileContext meta = new HFileContextBuilder() - .withChecksumType(cktype).build(); + HFileContext meta = new HFileContextBuilder(). + withChecksumType(cktype). + build(); HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta); DataOutputStream dos = hbw.startWriting(BlockType.DATA); - for (int i = 0; i < 1000; ++i) + for (int i = 0; i < 1000; ++i) { dos.writeInt(i); + } hbw.writeHeaderAndData(os); int totalSize = hbw.getOnDiskSizeWithHeader(); os.close(); @@ -125,7 +127,7 @@ public class TestChecksum { meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( is, totalSize, (HFileSystem) fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, false); + HFileBlock b = hbr.readBlockData(0, -1, false); ByteBuff data = b.getBufferWithoutHeader(); for (int i = 0; i < 1000; i++) { assertEquals(i, data.getInt()); @@ -188,7 +190,7 @@ public class TestChecksum { .withHBaseCheckSum(true) .build(); HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, pread); b.sanityCheck(); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, @@ -209,17 +211,17 @@ public class TestChecksum { // requests. Verify that this is correct. for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); } // The next read should have hbase checksum verification reanabled, // we verify this by assertng that there was a hbase-checksum failure. - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, pread); assertEquals(1, HFile.getChecksumFailuresCount()); // Since the above encountered a checksum failure, we switch // back to not checking hbase checksums. - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); is.close(); @@ -230,7 +232,7 @@ public class TestChecksum { assertEquals(false, newfs.useHBaseChecksum()); is = new FSDataInputStreamWrapper(newfs, path); hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta); - b = hbr.readBlockData(0, -1, -1, pread); + b = hbr.readBlockData(0, -1, pread); is.close(); b.sanityCheck(); b = b.unpack(meta, hbr); @@ -314,7 +316,7 @@ public class TestChecksum { .build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper( is, nochecksum), totalSize, hfs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, pread); is.close(); b.sanityCheck(); assertEquals(dataSize, b.getUncompressedSizeWithoutHeader()); @@ -354,5 +356,4 @@ public class TestChecksum { return false; // checksum validation failure } } -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 6748efc0947..eb87a0c7f90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -320,7 +320,7 @@ public class TestHFileBlock { .withIncludesTags(includesTag) .withCompression(algo).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); + HFileBlock b = hbr.readBlockData(0, -1, pread); is.close(); assertEquals(0, HFile.getChecksumFailuresCount()); @@ -334,17 +334,15 @@ public class TestHFileBlock { is = fs.open(path); hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta); b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + - b.totalChecksumBytes(), -1, pread); + b.totalChecksumBytes(), pread); assertEquals(expected, b); int wrongCompressedSize = 2172; try { b = hbr.readBlockData(0, wrongCompressedSize - + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread); + + HConstants.HFILEBLOCK_HEADER_SIZE, pread); fail("Exception expected"); } catch (IOException ex) { - String expectedPrefix = "On-disk size without header provided is " - + wrongCompressedSize + ", but block header contains " - + b.getOnDiskSizeWithoutHeader() + "."; + String expectedPrefix = "Passed in onDiskSizeWithHeader="; assertTrue("Invalid exception message: '" + ex.getMessage() + "'.\nMessage is expected to start with: '" + expectedPrefix + "'", ex.getMessage().startsWith(expectedPrefix)); @@ -424,7 +422,7 @@ public class TestHFileBlock { HFileBlock blockFromHFile, blockUnpacked; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - blockFromHFile = hbr.readBlockData(pos, -1, -1, pread); + blockFromHFile = hbr.readBlockData(pos, -1, pread); assertEquals(0, HFile.getChecksumFailuresCount()); blockFromHFile.sanityCheck(); pos += blockFromHFile.getOnDiskSizeWithHeader(); @@ -560,7 +558,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Reading block #" + i + " at offset " + curOffset); } - HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread); + HFileBlock b = hbr.readBlockData(curOffset, -1, pread); if (detailedLogging) { LOG.info("Block #" + i + ": " + b); } @@ -574,8 +572,7 @@ public class TestHFileBlock { // Now re-load this block knowing the on-disk size. This tests a // different branch in the loader. - HFileBlock b2 = hbr.readBlockData(curOffset, - b.getOnDiskSizeWithHeader(), -1, pread); + HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread); b2.sanityCheck(); assertEquals(b.getBlockType(), b2.getBlockType()); @@ -601,7 +598,7 @@ public class TestHFileBlock { b = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only - ByteBuff bufRead = b.getBufferWithHeader(); + ByteBuff bufRead = b.getBufferReadOnly(); ByteBuffer bufExpected = expectedContents.get(i); boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), bufRead.arrayOffset(), @@ -684,7 +681,7 @@ public class TestHFileBlock { HFileBlock b; try { long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; - b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread); + b = hbr.readBlockData(offset, onDiskSizeArg, pread); } catch (IOException ex) { LOG.error("Error in client " + clientId + " trying to read block at " + offset + ", pread=" + pread + ", withOnDiskSize=" + @@ -719,8 +716,7 @@ public class TestHFileBlock { protected void testConcurrentReadingInternals() throws IOException, InterruptedException, ExecutionException { for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) { - Path path = - new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); + Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading"); Random rand = defaultRandom(); List offsets = new ArrayList(); List types = new ArrayList(); @@ -843,8 +839,7 @@ public class TestHFileBlock { .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM) .withChecksumType(ChecksumType.NULL).build(); HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, - HFileBlock.FILL_HEADER, -1, - 0, meta); + HFileBlock.FILL_HEADER, -1, 0, -1, meta); long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase( new MultiByteBuff(buf).getClass(), true) + HConstants.HFILEBLOCK_HEADER_SIZE + size); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java deleted file mode 100644 index 16607b9748c..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ /dev/null @@ -1,750 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.hfile; - -import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; -import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; -import static org.junit.Assert.*; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; -import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; -import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.io.compress.Compressor; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.common.base.Preconditions; - -/** - * This class has unit tests to prove that older versions of - * HFiles (without checksums) are compatible with current readers. - */ -@Category({IOTests.class, SmallTests.class}) -@RunWith(Parameterized.class) -public class TestHFileBlockCompatibility { - - private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class); - private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { - NONE, GZ }; - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private HFileSystem fs; - - private final boolean includesMemstoreTS; - private final boolean includesTag; - - public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) { - this.includesMemstoreTS = includesMemstoreTS; - this.includesTag = includesTag; - } - - @Parameters - public static Collection parameters() { - return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED; - } - - @Before - public void setUp() throws IOException { - fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration()); - } - - public byte[] createTestV1Block(Compression.Algorithm algo) - throws IOException { - Compressor compressor = algo.getCompressor(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - OutputStream os = algo.createCompressionStream(baos, compressor, 0); - DataOutputStream dos = new DataOutputStream(os); - BlockType.META.write(dos); // Let's make this a meta block. - TestHFileBlock.writeTestBlockContents(dos); - dos.flush(); - algo.returnCompressor(compressor); - return baos.toByteArray(); - } - - private Writer createTestV2Block(Compression.Algorithm algo) - throws IOException { - final BlockType blockType = BlockType.DATA; - Writer hbw = new Writer(algo, null, - includesMemstoreTS, includesTag); - DataOutputStream dos = hbw.startWriting(blockType); - TestHFileBlock.writeTestBlockContents(dos); - // make sure the block is ready by calling hbw.getHeaderAndData() - hbw.getHeaderAndData(); - assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader()); - hbw.releaseCompressor(); - return hbw; - } - - private String createTestBlockStr(Compression.Algorithm algo, - int correctLength) throws IOException { - Writer hbw = createTestV2Block(algo); - byte[] testV2Block = hbw.getHeaderAndData(); - int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9; - if (testV2Block.length == correctLength) { - // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid - // variations across operating systems. - // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format. - testV2Block[osOffset] = 3; - } - return Bytes.toStringBinary(testV2Block); - } - - @Test - public void testNoCompression() throws IOException { - assertEquals(4000, createTestV2Block(NONE).getBlockForCaching(). - getUncompressedSizeWithoutHeader()); - } - - @Test - public void testGzipCompression() throws IOException { - final String correctTestBlockStr = - "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF" - + "\\xFF\\xFF\\xFF\\xFF" - // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html - + "\\x1F\\x8B" // gzip magic signature - + "\\x08" // Compression method: 8 = "deflate" - + "\\x00" // Flags - + "\\x00\\x00\\x00\\x00" // mtime - + "\\x00" // XFL (extra flags) - // OS (0 = FAT filesystems, 3 = Unix). However, this field - // sometimes gets set to 0 on Linux and Mac, so we reset it to 3. - + "\\x03" - + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa" - + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c" - + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"; - final int correctGzipBlockLength = 82; - - String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength); - assertEquals(correctTestBlockStr, returnedStr); - } - - @Test - public void testReaderV2() throws IOException { - if(includesTag) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } - for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { - for (boolean pread : new boolean[] { false, true }) { - LOG.info("testReaderV2: Compression algorithm: " + algo + - ", pread=" + pread); - Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" - + algo); - FSDataOutputStream os = fs.create(path); - Writer hbw = new Writer(algo, null, - includesMemstoreTS, includesTag); - long totalSize = 0; - for (int blockId = 0; blockId < 2; ++blockId) { - DataOutputStream dos = hbw.startWriting(BlockType.DATA); - for (int i = 0; i < 1234; ++i) - dos.writeInt(i); - hbw.writeHeaderAndData(os); - totalSize += hbw.getOnDiskSizeWithHeader(); - } - os.close(); - - FSDataInputStream is = fs.open(path); - HFileContext meta = new HFileContextBuilder() - .withHBaseCheckSum(false) - .withIncludesMvcc(includesMemstoreTS) - .withIncludesTags(includesTag) - .withCompression(algo) - .build(); - HFileBlock.FSReader hbr = - new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, -1, pread); - is.close(); - - b.sanityCheck(); - assertEquals(4936, b.getUncompressedSizeWithoutHeader()); - assertEquals(algo == GZ ? 2173 : 4936, - b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); - HFileBlock expected = b; - - if (algo == GZ) { - is = fs.open(path); - hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, - meta); - b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + - b.totalChecksumBytes(), -1, pread); - assertEquals(expected, b); - int wrongCompressedSize = 2172; - try { - b = hbr.readBlockData(0, wrongCompressedSize - + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread); - fail("Exception expected"); - } catch (IOException ex) { - String expectedPrefix = "On-disk size without header provided is " - + wrongCompressedSize + ", but block header contains " - + b.getOnDiskSizeWithoutHeader() + "."; - assertTrue("Invalid exception message: '" + ex.getMessage() - + "'.\nMessage is expected to start with: '" + expectedPrefix - + "'", ex.getMessage().startsWith(expectedPrefix)); - } - is.close(); - } - } - } - } - - /** - * Test encoding/decoding data blocks. - * @throws IOException a bug or a problem with temporary files. - */ - @Test - public void testDataBlockEncoding() throws IOException { - if(includesTag) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } - final int numBlocks = 5; - for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { - for (boolean pread : new boolean[] { false, true }) { - for (DataBlockEncoding encoding : DataBlockEncoding.values()) { - LOG.info("testDataBlockEncoding algo " + algo + - " pread = " + pread + - " encoding " + encoding); - Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" - + algo + "_" + encoding.toString()); - FSDataOutputStream os = fs.create(path); - HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ? - new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE; - TestHFileBlockCompatibility.Writer hbw = - new TestHFileBlockCompatibility.Writer(algo, - dataBlockEncoder, includesMemstoreTS, includesTag); - long totalSize = 0; - final List encodedSizes = new ArrayList(); - final List encodedBlocks = new ArrayList(); - for (int blockId = 0; blockId < numBlocks; ++blockId) { - hbw.startWriting(BlockType.DATA); - TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag); - hbw.writeHeaderAndData(os); - int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; - byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader(); - final int encodedSize = encodedResultWithHeader.length - headerLen; - if (encoding != DataBlockEncoding.NONE) { - // We need to account for the two-byte encoding algorithm ID that - // comes after the 24-byte block header but before encoded KVs. - headerLen += DataBlockEncoding.ID_SIZE; - } - byte[] encodedDataSection = - new byte[encodedResultWithHeader.length - headerLen]; - System.arraycopy(encodedResultWithHeader, headerLen, - encodedDataSection, 0, encodedDataSection.length); - final ByteBuffer encodedBuf = - ByteBuffer.wrap(encodedDataSection); - encodedSizes.add(encodedSize); - encodedBlocks.add(encodedBuf); - totalSize += hbw.getOnDiskSizeWithHeader(); - } - os.close(); - - FSDataInputStream is = fs.open(path); - HFileContext meta = new HFileContextBuilder() - .withHBaseCheckSum(false) - .withIncludesMvcc(includesMemstoreTS) - .withIncludesTags(includesTag) - .withCompression(algo) - .build(); - HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), - totalSize, fs, path, meta); - hbr.setDataBlockEncoder(dataBlockEncoder); - hbr.setIncludesMemstoreTS(includesMemstoreTS); - - HFileBlock b; - int pos = 0; - for (int blockId = 0; blockId < numBlocks; ++blockId) { - b = hbr.readBlockData(pos, -1, -1, pread); - b.sanityCheck(); - if (meta.isCompressedOrEncrypted()) { - assertFalse(b.isUnpacked()); - b = b.unpack(meta, hbr); - } - pos += b.getOnDiskSizeWithHeader(); - - assertEquals((int) encodedSizes.get(blockId), - b.getUncompressedSizeWithoutHeader()); - ByteBuff actualBuffer = b.getBufferWithoutHeader(); - if (encoding != DataBlockEncoding.NONE) { - // We expect a two-byte big-endian encoding id. - assertEquals(0, actualBuffer.get(0)); - assertEquals(encoding.getId(), actualBuffer.get(1)); - actualBuffer.position(2); - actualBuffer = actualBuffer.slice(); - } - - ByteBuffer expectedBuffer = encodedBlocks.get(blockId); - expectedBuffer.rewind(); - - // test if content matches, produce nice message - TestHFileBlock.assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, - algo, encoding, pread); - } - is.close(); - } - } - } - } - /** - * This is the version of the HFileBlock.Writer that is used to - * create V2 blocks with minor version 0. These blocks do not - * have hbase-level checksums. The code is here to test - * backward compatibility. The reason we do not inherit from - * HFileBlock.Writer is because we never ever want to change the code - * in this class but the code in HFileBlock.Writer will continually - * evolve. - */ - public static final class Writer extends HFileBlock.Writer { - - // These constants are as they were in minorVersion 0. - private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; - private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER; - private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM; - - private enum State { - INIT, - WRITING, - BLOCK_READY - }; - - /** Writer state. Used to ensure the correct usage protocol. */ - private State state = State.INIT; - - /** Compression algorithm for all blocks this instance writes. */ - private final Compression.Algorithm compressAlgo; - - /** Data block encoder used for data blocks */ - private final HFileDataBlockEncoder dataBlockEncoder; - - private HFileBlockEncodingContext dataBlockEncodingCtx; - /** block encoding context for non-data blocks */ - private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; - - /** - * The stream we use to accumulate data in uncompressed format for each - * block. We reset this stream at the end of each block and reuse it. The - * header is written as the first {@link #HEADER_SIZE} bytes into this - * stream. - */ - private ByteArrayOutputStream baosInMemory; - - /** Compressor, which is also reused between consecutive blocks. */ - private Compressor compressor; - - /** - * Current block type. Set in {@link #startWriting(BlockType)}. Could be - * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} - * to {@link BlockType#ENCODED_DATA}. - */ - private BlockType blockType; - - /** - * A stream that we write uncompressed bytes to, which compresses them and - * writes them to {@link #baosInMemory}. - */ - private DataOutputStream userDataStream; - - /** - * Bytes to be written to the file system, including the header. Compressed - * if compression is turned on. - */ - private byte[] onDiskBytesWithHeader; - - /** - * Valid in the READY state. Contains the header and the uncompressed (but - * potentially encoded, if this is a data block) bytes, so the length is - * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}. - */ - private byte[] uncompressedBytesWithHeader; - - /** - * Current block's start offset in the {@link HFile}. Set in - * {@link #writeHeaderAndData(FSDataOutputStream)}. - */ - private long startOffset; - - /** - * Offset of previous block by block type. Updated when the next block is - * started. - */ - private long[] prevOffsetByType; - - /** The offset of the previous block of the same type */ - private long prevOffset; - - private int unencodedDataSizeWritten; - - public Writer(Compression.Algorithm compressionAlgorithm, - HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) { - this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false) - .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag) - .withCompression(compressionAlgorithm).build()); - } - - public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) { - super(dataBlockEncoder, meta); - compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression(); - this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder - : NoOpDataBlockEncoder.INSTANCE; - defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta); - dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta); - baosInMemory = new ByteArrayOutputStream(); - - prevOffsetByType = new long[BlockType.values().length]; - for (int i = 0; i < prevOffsetByType.length; ++i) - prevOffsetByType[i] = -1; - } - - /** - * Starts writing into the block. The previous block's data is discarded. - * - * @return the stream the user can write their data into - * @throws IOException - */ - public DataOutputStream startWriting(BlockType newBlockType) - throws IOException { - if (state == State.BLOCK_READY && startOffset != -1) { - // We had a previous block that was written to a stream at a specific - // offset. Save that offset as the last offset of a block of that type. - prevOffsetByType[blockType.getId()] = startOffset; - } - - startOffset = -1; - blockType = newBlockType; - - baosInMemory.reset(); - baosInMemory.write(DUMMY_HEADER); - - state = State.WRITING; - - // We will compress it later in finishBlock() - userDataStream = new DataOutputStream(baosInMemory); - if (newBlockType == BlockType.DATA) { - this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream); - } - this.unencodedDataSizeWritten = 0; - return userDataStream; - } - - @Override - public void write(Cell c) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(c); - expectState(State.WRITING); - this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream); - this.unencodedDataSizeWritten += kv.getLength(); - if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) { - this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getSequenceId()); - } - } - - /** - * Returns the stream for the user to write to. The block writer takes care - * of handling compression and buffering for caching on write. Can only be - * called in the "writing" state. - * - * @return the data output stream for the user to write to - */ - DataOutputStream getUserDataStream() { - expectState(State.WRITING); - return userDataStream; - } - - /** - * Transitions the block writer from the "writing" state to the "block - * ready" state. Does nothing if a block is already finished. - */ - void ensureBlockReady() throws IOException { - Preconditions.checkState(state != State.INIT, - "Unexpected state: " + state); - - if (state == State.BLOCK_READY) - return; - - // This will set state to BLOCK_READY. - finishBlock(); - } - - /** - * An internal method that flushes the compressing stream (if using - * compression), serializes the header, and takes care of the separate - * uncompressed stream for caching on write, if applicable. Sets block - * write state to "block ready". - */ - void finishBlock() throws IOException { - if (blockType == BlockType.DATA) { - this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream, - baosInMemory.toByteArray(), blockType); - blockType = dataBlockEncodingCtx.getBlockType(); - } - userDataStream.flush(); - // This does an array copy, so it is safe to cache this byte array. - uncompressedBytesWithHeader = baosInMemory.toByteArray(); - prevOffset = prevOffsetByType[blockType.getId()]; - - // We need to set state before we can package the block up for - // cache-on-write. In a way, the block is ready, but not yet encoded or - // compressed. - state = State.BLOCK_READY; - if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) { - onDiskBytesWithHeader = dataBlockEncodingCtx - .compressAndEncrypt(uncompressedBytesWithHeader); - } else { - onDiskBytesWithHeader = defaultBlockEncodingCtx - .compressAndEncrypt(uncompressedBytesWithHeader); - } - - // put the header for on disk bytes - putHeader(onDiskBytesWithHeader, 0, - onDiskBytesWithHeader.length, - uncompressedBytesWithHeader.length); - //set the header for the uncompressed bytes (for cache-on-write) - putHeader(uncompressedBytesWithHeader, 0, - onDiskBytesWithHeader.length, - uncompressedBytesWithHeader.length); - } - - /** - * Put the header into the given byte array at the given offset. - * @param onDiskSize size of the block on disk - * @param uncompressedSize size of the block after decompression (but - * before optional data block decoding) - */ - private void putHeader(byte[] dest, int offset, int onDiskSize, - int uncompressedSize) { - offset = blockType.put(dest, offset); - offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE); - offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE); - Bytes.putLong(dest, offset, prevOffset); - } - - /** - * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records - * the offset of this block so that it can be referenced in the next block - * of the same type. - * - * @param out - * @throws IOException - */ - public void writeHeaderAndData(FSDataOutputStream out) throws IOException { - long offset = out.getPos(); - if (startOffset != -1 && offset != startOffset) { - throw new IOException("A " + blockType + " block written to a " - + "stream twice, first at offset " + startOffset + ", then at " - + offset); - } - startOffset = offset; - - writeHeaderAndData((DataOutputStream) out); - } - - /** - * Writes the header and the compressed data of this block (or uncompressed - * data when not using compression) into the given stream. Can be called in - * the "writing" state or in the "block ready" state. If called in the - * "writing" state, transitions the writer to the "block ready" state. - * - * @param out the output stream to write the - * @throws IOException - */ - private void writeHeaderAndData(DataOutputStream out) throws IOException { - ensureBlockReady(); - out.write(onDiskBytesWithHeader); - } - - /** - * Returns the header or the compressed data (or uncompressed data when not - * using compression) as a byte array. Can be called in the "writing" state - * or in the "block ready" state. If called in the "writing" state, - * transitions the writer to the "block ready" state. - * - * @return header and data as they would be stored on disk in a byte array - * @throws IOException - */ - public byte[] getHeaderAndData() throws IOException { - ensureBlockReady(); - return onDiskBytesWithHeader; - } - - /** - * Releases the compressor this writer uses to compress blocks into the - * compressor pool. Needs to be called before the writer is discarded. - */ - public void releaseCompressor() { - if (compressor != null) { - compressAlgo.returnCompressor(compressor); - compressor = null; - } - } - - /** - * Returns the on-disk size of the data portion of the block. This is the - * compressed size if compression is enabled. Can only be called in the - * "block ready" state. Header is not compressed, and its size is not - * included in the return value. - * - * @return the on-disk size of the block, not including the header. - */ - public int getOnDiskSizeWithoutHeader() { - expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length - HEADER_SIZE; - } - - /** - * Returns the on-disk size of the block. Can only be called in the - * "block ready" state. - * - * @return the on-disk size of the block ready to be written, including the - * header size - */ - public int getOnDiskSizeWithHeader() { - expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length; - } - - /** - * The uncompressed size of the block data. Does not include header size. - */ - public int getUncompressedSizeWithoutHeader() { - expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length - HEADER_SIZE; - } - - /** - * The uncompressed size of the block data, including header size. - */ - public int getUncompressedSizeWithHeader() { - expectState(State.BLOCK_READY); - return uncompressedBytesWithHeader.length; - } - - /** @return true if a block is being written */ - public boolean isWriting() { - return state == State.WRITING; - } - - /** - * Returns the number of bytes written into the current block so far, or - * zero if not writing the block at the moment. Note that this will return - * zero in the "block ready" state as well. - * - * @return the number of bytes written - */ - public int blockSizeWritten() { - if (state != State.WRITING) - return 0; - return this.unencodedDataSizeWritten; - } - - /** - * Returns the header followed by the uncompressed data, even if using - * compression. This is needed for storing uncompressed blocks in the block - * cache. Can be called in the "writing" state or the "block ready" state. - * - * @return uncompressed block bytes for caching on write - */ - private byte[] getUncompressedDataWithHeader() { - expectState(State.BLOCK_READY); - - return uncompressedBytesWithHeader; - } - - private void expectState(State expectedState) { - if (state != expectedState) { - throw new IllegalStateException("Expected state: " + expectedState + - ", actual state: " + state); - } - } - - /** - * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte - * buffer. - * - * @return uncompressed block for caching on write in the form of a buffer - */ - public ByteBuffer getUncompressedBufferWithHeader() { - byte[] b = getUncompressedDataWithHeader(); - return ByteBuffer.wrap(b, 0, b.length); - } - - /** - * Takes the given {@link BlockWritable} instance, creates a new block of - * its appropriate type, writes the writable into this block, and flushes - * the block into the output stream. The writer is instructed not to buffer - * uncompressed bytes for cache-on-write. - * - * @param bw the block-writable object to write as a block - * @param out the file system output stream - * @throws IOException - */ - public void writeBlock(BlockWritable bw, FSDataOutputStream out) - throws IOException { - bw.writeToBlock(startWriting(bw.getBlockType())); - writeHeaderAndData(out); - } - - /** - * Creates a new HFileBlock. - */ - public HFileBlock getBlockForCaching() { - HFileContext meta = new HFileContextBuilder() - .withHBaseCheckSum(false) - .withChecksumType(ChecksumType.NULL) - .withBytesPerCheckSum(0) - .build(); - return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), - getUncompressedSizeWithoutHeader(), prevOffset, - getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, - getOnDiskSizeWithoutHeader(), meta); - } - } - -} - diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 687d3cd2c72..470d48358e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -185,8 +185,7 @@ public class TestHFileBlockIndex { } missCount += 1; - prevBlock = realReader.readBlockData(offset, onDiskSize, - -1, pread); + prevBlock = realReader.readBlockData(offset, onDiskSize, pread); prevOffset = offset; prevOnDiskSize = onDiskSize; prevPread = pread; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 6f434bb1cac..387514e980e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -92,8 +92,7 @@ public class TestHFileDataBlockEncoder { if (blockEncoder.getDataBlockEncoding() == DataBlockEncoding.NONE) { - assertEquals(block.getBufferWithHeader(), - returnedBlock.getBufferWithHeader()); + assertEquals(block.getBufferReadOnly(), returnedBlock.getBufferReadOnly()); } else { if (BlockType.ENCODED_DATA != returnedBlock.getBlockType()) { System.out.println(blockEncoder); @@ -127,7 +126,7 @@ public class TestHFileDataBlockEncoder { .build(); HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER, 0, - 0, hfileContext); + 0, -1, hfileContext); HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags); assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length); } @@ -198,7 +197,7 @@ public class TestHFileDataBlockEncoder { .build(); HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER, 0, - 0, meta); + 0, -1, meta); return b; } @@ -220,7 +219,8 @@ public class TestHFileDataBlockEncoder { byte[] encodedBytes = baos.toByteArray(); size = encodedBytes.length - block.getDummyHeaderForVersion().length; return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes), - HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext()); + HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1, + block.getHFileContext()); } private void writeBlock(List kvs, HFileContext fileContext, boolean useTags) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index ba3a344aaa8..3264558db15 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -99,7 +99,7 @@ public class TestHFileEncryption { private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size) throws IOException { - HFileBlock b = hbr.readBlockData(pos, -1, -1, false); + HFileBlock b = hbr.readBlockData(pos, -1, false); assertEquals(0, HFile.getChecksumFailuresCount()); b.sanityCheck(); assertFalse(b.isUnpacked()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index c7eb11bd46e..983ec2f2ffa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -218,7 +218,7 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false) .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuff buf = block.getBufferWithoutHeader(); @@ -279,13 +279,14 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false) .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuff buf = block.getBufferWithoutHeader(); if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { - throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); + throw new IOException("Failed to deserialize block " + this + + " into a " + t.getClass().getSimpleName()); } Text expectedText = (metaCounter == 0 ? new Text("Paris") : metaCounter == 1 ? new Text( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 69a77bfab60..d20ba2b6c37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -78,14 +78,8 @@ public class TestPrefetch { // Check that all of the data blocks were preloaded BlockCache blockCache = cacheConf.getBlockCache(); long offset = 0; - HFileBlock prevBlock = null; while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { - long onDiskSize = -1; - if (prevBlock != null) { - onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); - } - HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, false, true, null, - null); + HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; if (block.getBlockType() == BlockType.DATA || @@ -93,7 +87,6 @@ public class TestPrefetch { block.getBlockType() == BlockType.INTERMEDIATE_INDEX) { assertTrue(isCached); } - prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 0916fe6ee9a..2357befd23c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -227,15 +227,10 @@ public class TestCacheOnWriteInSchema { assertTrue(testDescription, scanner.seekTo()); // Cribbed from io.hfile.TestCacheOnWrite long offset = 0; - HFileBlock prevBlock = null; while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { - long onDiskSize = -1; - if (prevBlock != null) { - onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); - } // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. - HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, + HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, DataBlockEncoding.NONE); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); @@ -249,7 +244,6 @@ public class TestCacheOnWriteInSchema { "block: " + block + "\n" + "blockCacheKey: " + blockCacheKey); } - prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } } finally {