From 8ace5bbfcea01e02c5661f75fe9458e04fa3b60f Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 29 Feb 2016 22:56:47 -0800 Subject: [PATCH] HBASE-15366 Add doc, trace-level logging, and test around hfileblock M hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java Make it emit its toString in format that matches the way we log elsewhere M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Capitalize statics. M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Verify and cleanup documentation of hfileblock format at head of class. Explain what 'EXTRA_SERIALIZATION_SPACE' is all about. Connect how we serialize and deserialize... done in different places and one way when pulling from HDFS and another when pulling from cache (TO BE FIXED). Shut down a load of public access. M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java Add trace-level logging M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java Make it Closeable --- .../org/apache/hadoop/hbase/HConstants.java | 4 +- .../io/encoding/BufferedDataBlockEncoder.java | 10 +- .../hbase/io/encoding/DataBlockEncoder.java | 3 +- .../hadoop/hbase/io/hfile/HFileContext.java | 26 +- .../hadoop/hbase/io/hfile/CacheConfig.java | 10 +- .../apache/hadoop/hbase/io/hfile/HFile.java | 13 +- .../hadoop/hbase/io/hfile/HFileBlock.java | 366 ++++++++++-------- .../hbase/io/hfile/HFileReaderImpl.java | 7 +- .../hadoop/hbase/io/hfile/HFileScanner.java | 5 +- .../hbase/io/hfile/HFileWriterImpl.java | 81 ++-- .../io/hfile/bucket/BucketAllocator.java | 17 +- .../hbase/io/hfile/bucket/BucketCache.java | 35 +- .../io/encoding/TestDataBlockEncoders.java | 15 +- .../encoding/TestSeekToBlockWithEncoders.java | 4 +- .../hadoop/hbase/io/hfile/TestChecksum.java | 4 +- .../hfile/TestForceCacheImportantBlocks.java | 6 +- .../hfile/TestHFileBackedByBucketCache.java | 231 +++++++++++ .../hadoop/hbase/io/hfile/TestHFileBlock.java | 2 +- .../io/hfile/TestHFileBlockCompatibility.java | 3 +- .../hbase/regionserver/TestBlocksRead.java | 2 +- 20 files changed, 581 insertions(+), 263 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index a02d89a4bad..0c6244f720f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -65,7 +65,9 @@ public final class HConstants { public static final byte[] RPC_HEADER = new byte[] { 'H', 'B', 'a', 's' }; public static final byte RPC_CURRENT_VERSION = 0; - // HFileBlock constants. + // HFileBlock constants. TODO!!!! THESE DEFINES BELONG IN HFILEBLOCK, NOT UP HERE. + // Needed down in hbase-common though by encoders but these encoders should not be dealing + // in the internals of hfileblocks. Fix encapsulation. /** The size data structures with minor version is 0 */ public static final int HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 817b1a79476..d873f7e7770 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -23,12 +23,12 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.ByteBufferedCell; +import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.SettableSequenceId; @@ -52,7 +52,9 @@ import org.apache.hadoop.io.WritableUtils; */ @InterfaceAudience.Private abstract class BufferedDataBlockEncoder implements DataBlockEncoder { - + /** + * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs + */ private static int INITIAL_KEY_BUFFER_SIZE = 512; @Override @@ -1140,8 +1142,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx .getEncodingState(); // Write the unencodedDataSizeWritten (with header size) - Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE - + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten + Bytes.putInt(uncompressedBytesWithHeader, + HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten ); if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) { encodingCtx.postEncoding(BlockType.ENCODED_DATA); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java index 397855a7edc..4adb2121c9c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -39,7 +39,8 @@ import org.apache.hadoop.hbase.nio.ByteBuff; */ @InterfaceAudience.Private public interface DataBlockEncoder { - +// TODO: This Interface should be deprecated and replaced. It presumes hfile and carnal knowledge of +// Cell internals. It was done for a different time. Remove. Purge. /** * Starts encoding for a block of KeyValues. Call * {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 99451464e17..909391aa535 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -218,22 +218,22 @@ public class HFileContext implements HeapSize, Cloneable { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("HFileContext ["); - sb.append(" usesHBaseChecksum="); sb.append(usesHBaseChecksum); - sb.append(" checksumType="); sb.append(checksumType); - sb.append(" bytesPerChecksum="); sb.append(bytesPerChecksum); - sb.append(" blocksize="); sb.append(blocksize); - sb.append(" encoding="); sb.append(encoding); - sb.append(" includesMvcc="); sb.append(includesMvcc); - sb.append(" includesTags="); sb.append(includesTags); - sb.append(" compressAlgo="); sb.append(compressAlgo); - sb.append(" compressTags="); sb.append(compressTags); - sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]"); + sb.append("["); + sb.append("usesHBaseChecksum="); sb.append(usesHBaseChecksum); + sb.append(", checksumType="); sb.append(checksumType); + sb.append(", bytesPerChecksum="); sb.append(bytesPerChecksum); + sb.append(", blocksize="); sb.append(blocksize); + sb.append(", encoding="); sb.append(encoding); + sb.append(", includesMvcc="); sb.append(includesMvcc); + sb.append(", includesTags="); sb.append(includesTags); + sb.append(", compressAlgo="); sb.append(compressAlgo); + sb.append(", compressTags="); sb.append(compressTags); + sb.append(", cryptoContext=["); sb.append(cryptoContext); sb.append("]"); if (hfileName != null) { - sb.append(" name="); + sb.append(", name="); sb.append(hfileName); } - sb.append(" ]"); + sb.append("]"); return sb.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index d6bdec0bb54..6fe3927107c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -82,8 +82,14 @@ public class CacheConfig { */ /** - * If the chosen ioengine can persist its state across restarts, the path to the file to - * persist to. + * If the chosen ioengine can persist its state across restarts, the path to the file to persist + * to. This file is NOT the data file. It is a file into which we will serialize the map of + * what is in the data file. For example, if you pass the following argument as + * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"), + * file:/tmp/bucketcache.data , then we will write the bucketcache data to the file + * /tmp/bucketcache.data but the metadata on where the data is in the supplied file + * is an in-memory map that needs to be persisted across restarts. Where to store this + * in-memory state is what you supply here: e.g. /tmp/bucketcache.map. */ public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index e9fa05ceee9..8582dbe9f2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -178,19 +178,20 @@ public class HFile { * The number of bytes per checksum. */ public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; - // For measuring number of checksum failures - static final Counter checksumFailures = new Counter(); - // for test purpose - public static final Counter dataBlockReadCnt = new Counter(); + // For measuring number of checksum failures + static final Counter CHECKSUM_FAILURES = new Counter(); + + // For tests. Gets incremented when we read a block whether from HDFS or from Cache. + public static final Counter DATABLOCK_READ_COUNT = new Counter(); /** * Number of checksum verification failures. It also * clears the counter. */ public static final long getChecksumFailuresCount() { - long count = checksumFailures.get(); - checksumFailures.set(0); + long count = CHECKSUM_FAILURES.get(); + CHECKSUM_FAILURES.set(0); return count; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index e2f524c9e24..6268f2eb405 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -26,6 +26,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -54,87 +56,121 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** - * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks. - * + *

Be aware that when we read from HDFS, we overread pulling in the next blocks' header too. + * We do this to save having to do two seeks to read an HFileBlock; a seek to read the header + * to figure lengths, etc., and then another seek to pull in the data. */ @InterfaceAudience.Private public class HFileBlock implements Cacheable { + private static final Log LOG = LogFactory.getLog(HFileBlock.class); /** - * On a checksum failure on a Reader, these many suceeding read - * requests switch back to using hdfs checksums before auto-reenabling - * hbase checksum verification. + * On a checksum failure, do these many succeeding read requests using hdfs checksums before + * auto-reenabling hbase checksum verification. */ static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3; + private static int UNSET = -1; public static final boolean FILL_HEADER = true; public static final boolean DONT_FILL_HEADER = false; - /** - * The size of block header when blockType is {@link BlockType#ENCODED_DATA}. - * This extends normal header by adding the id of encoder. - */ - public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE - + DataBlockEncoding.ID_SIZE; - - static final byte[] DUMMY_HEADER_NO_CHECKSUM = - new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; - // How to get the estimate correctly? if it is a singleBB? public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int)ClassSize.estimateBase(MultiByteBuff.class, false); - // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader - public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT - + Bytes.SIZEOF_LONG; + /** + * See #blockDeserializer method for more info. + * 13 bytes of extra stuff stuck on the end of the HFileBlock that we pull in from HDFS (note, + * when we read from HDFS, we pull in an HFileBlock AND the header of the next block if one). + * The 13 bytes are: usesHBaseChecksum (1 byte) + offset of this block (long) + + * nextBlockOnDiskSizeWithHeader (int). + */ + public static final int EXTRA_SERIALIZATION_SPACE = + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; /** * Each checksum value is an integer that can be stored in 4 bytes. */ static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT; + static final byte[] DUMMY_HEADER_NO_CHECKSUM = + new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; + + /** + * Used deserializing blocks from Cache. + * + * Serializing to cache is a little hard to follow. See Writer#finishBlock for where it is done. + * When we start to append to a new HFileBlock, + * we skip over where the header should go before we start adding Cells. When the block is + * done, we'll then go back and fill in the header and the checksum tail. Be aware that what + * gets serialized into the blockcache is a byte array that contains an HFileBlock followed by + * its checksums and then the header of the next HFileBlock (needed to help navigate), followed + * again by an extra 13 bytes of meta info needed when time to recreate the HFileBlock from cache. + * + * ++++++++++++++ + * + HFileBlock + + * ++++++++++++++ + * + Checksums + + * ++++++++++++++ + * + NextHeader + + * ++++++++++++++ + * + ExtraMeta! + + * ++++++++++++++ + * + * TODO: Fix it so we do NOT put the NextHeader into blockcache. It is not necessary. + */ static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType) - throws IOException { + throws IOException { + // Rewind to just before the EXTRA_SERIALIZATION_SPACE. buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); + // Get a new buffer to pass the deserialized HFileBlock for it to 'own'. ByteBuff newByteBuffer; if (reuse) { newByteBuffer = buf.slice(); } else { - // Used only in tests int len = buf.limit(); newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len)); newByteBuffer.put(0, buf, buf.position(), len); } + // Read out the EXTRA_SERIALIZATION_SPACE content and shove into our HFileBlock. buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); boolean usesChecksum = buf.get() == (byte)1; @@ -158,6 +194,7 @@ public class HFileBlock implements Cacheable { return deserialize(b, false, MemoryType.EXCLUSIVE); } }; + private static final int deserializerIdentifier; static { deserializerIdentifier = CacheableDeserializerIdManager @@ -167,18 +204,28 @@ public class HFileBlock implements Cacheable { /** Type of block. Header field 0. */ private BlockType blockType; - /** Size on disk excluding header, including checksum. Header field 1. */ + /** + * Size on disk excluding header, including checksum. Header field 1. + * @see Writer#putHeader(byte[], int, int, int, int) + */ private int onDiskSizeWithoutHeader; - /** Size of pure data. Does not include header or checksums. Header field 2. */ + /** + * Size of pure data. Does not include header or checksums. Header field 2. + * @see Writer#putHeader(byte[], int, int, int, int) + */ private final int uncompressedSizeWithoutHeader; - /** The offset of the previous block on disk. Header field 3. */ + /** + * The offset of the previous block on disk. Header field 3. + * @see Writer#putHeader(byte[], int, int, int, int) + */ private final long prevBlockOffset; /** * Size on disk of header + data. Excludes checksum. Header field 6, * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. + * @see Writer#putHeader(byte[], int, int, int, int) */ private final int onDiskDataSizeWithHeader; @@ -192,20 +239,20 @@ public class HFileBlock implements Cacheable { * The offset of this block in the file. Populated by the reader for * convenience of access. This offset is not part of the block header. */ - private long offset = -1; + private long offset = UNSET; /** * The on-disk size of the next block, including the header, obtained by * peeking into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the next block's * header, or -1 if unknown. */ - private int nextBlockOnDiskSizeWithHeader = -1; + private int nextBlockOnDiskSizeWithHeader = UNSET; private MemoryType memType = MemoryType.EXCLUSIVE; /** * Creates a new {@link HFile} block from the given fields. This constructor - * is mostly used when the block data has already been read and uncompressed, + * is used when the block data has already been read and uncompressed, * and is sitting in a byte buffer. * * @param blockType the type of this block, see {@link BlockType} @@ -213,8 +260,8 @@ public class HFileBlock implements Cacheable { * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} * @param prevBlockOffset see {@link #prevBlockOffset} * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by - * uncompressed data. This - * @param fillHeader when true, parse {@code buf} and override the first 4 header fields. + * uncompressed data. + * @param fillHeader when true, write the first 4 header fields into passed buffer. * @param offset the file offset the block was read from * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} * @param fileContext HFile meta data @@ -230,8 +277,9 @@ public class HFileBlock implements Cacheable { this.offset = offset; this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; this.fileContext = fileContext; - if (fillHeader) + if (fillHeader) { overwriteHeader(); + } this.buf.rewind(); } @@ -292,8 +340,8 @@ public class HFileBlock implements Cacheable { } else { contextBuilder.withChecksumType(ChecksumType.NULL); contextBuilder.withBytesPerCheckSum(0); - this.onDiskDataSizeWithHeader = onDiskSizeWithoutHeader + - HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; + this.onDiskDataSizeWithHeader = + onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; } this.fileContext = contextBuilder.build(); this.memType = memType; @@ -324,14 +372,14 @@ public class HFileBlock implements Cacheable { /** * @return the on-disk size of the data part + checksum (header excluded). */ - public int getOnDiskSizeWithoutHeader() { + int getOnDiskSizeWithoutHeader() { return onDiskSizeWithoutHeader; } /** * @return the uncompressed size of data part (header and checksum excluded). */ - public int getUncompressedSizeWithoutHeader() { + int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; } @@ -339,7 +387,7 @@ public class HFileBlock implements Cacheable { * @return the offset of the previous block of the same type in the file, or * -1 if unknown */ - public long getPrevBlockOffset() { + long getPrevBlockOffset() { return prevBlockOffset; } @@ -381,7 +429,7 @@ public class HFileBlock implements Cacheable { * * @return the buffer of this block for read-only operations */ - public ByteBuff getBufferReadOnly() { + ByteBuff getBufferReadOnly() { ByteBuff dup = this.buf.duplicate(); dup.limit(buf.limit() - totalChecksumBytes()); return dup.slice(); @@ -473,20 +521,20 @@ public class HFileBlock implements Cacheable { @Override public String toString() { StringBuilder sb = new StringBuilder() - .append("HFileBlock [") - .append(" fileOffset=").append(offset) - .append(" headerSize()=").append(headerSize()) - .append(" blockType=").append(blockType) - .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) - .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) - .append(" prevBlockOffset=").append(prevBlockOffset) - .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum()); + .append("[") + .append("blockType=").append(blockType) + .append(", fileOffset=").append(offset) + .append(", headerSize=").append(headerSize()) + .append(", onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) + .append(", uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) + .append(", prevBlockOffset=").append(prevBlockOffset) + .append(", isUseHBaseChecksum=").append(fileContext.isUseHBaseChecksum()); if (fileContext.isUseHBaseChecksum()) { - sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) - .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1)) - .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); + sb.append(", checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) + .append(", bytesPerChecksum=").append(this.buf.getInt(24 + 1)) + .append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); } else { - sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) + sb.append(", onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) .append("(").append(onDiskSizeWithoutHeader) .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); } @@ -501,13 +549,13 @@ public class HFileBlock implements Cacheable { bufWithoutHeader.get(dataBeginBytes); dataBegin = Bytes.toStringBinary(dataBeginBytes); } - sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader()) - .append(" totalChecksumBytes()=").append(totalChecksumBytes()) - .append(" isUnpacked()=").append(isUnpacked()) - .append(" buf=[ ").append(buf).append(" ]") - .append(" dataBeginsWith=").append(dataBegin) - .append(" fileContext=").append(fileContext) - .append(" ]"); + sb.append(", getOnDiskSizeWithHeader=").append(getOnDiskSizeWithHeader()) + .append(", totalChecksumBytes=").append(totalChecksumBytes()) + .append(", isUnpacked=").append(isUnpacked()) + .append(", buf=[").append(buf).append("]") + .append(", dataBeginsWith=").append(dataBegin) + .append(", fileContext=").append(fileContext) + .append("]"); return sb.toString(); } @@ -632,19 +680,8 @@ public class HFileBlock implements Cacheable { } } - /** - * @param expectedType the expected type of this block - * @throws IOException if this block's type is different than expected - */ - public void expectType(BlockType expectedType) throws IOException { - if (blockType != expectedType) { - throw new IOException("Invalid block type: expected=" + expectedType - + ", actual=" + blockType); - } - } - /** @return the offset of this block in the file it was read from */ - public long getOffset() { + long getOffset() { if (offset < 0) { throw new IllegalStateException( "HFile block offset not initialized properly"); @@ -655,7 +692,7 @@ public class HFileBlock implements Cacheable { /** * @return a byte stream reading the data + checksum of this block */ - public DataInputStream getByteStream() { + DataInputStream getByteStream() { ByteBuff dup = this.buf.duplicate(); dup.position(this.headerSize()); return new DataInputStream(new ByteBuffInputStream(dup)); @@ -685,21 +722,20 @@ public class HFileBlock implements Cacheable { } /** - * Read from an input stream. Analogous to + * Read from an input stream at least necessaryLen and if possible, + * extraLen also if available. Analogous to * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a - * number of "extra" bytes that would be desirable but not absolutely - * necessary to read. + * number of "extra" bytes to also optionally read. * * @param in the input stream to read from * @param buf the buffer to read into * @param bufOffset the destination offset in the buffer - * @param necessaryLen the number of bytes that are absolutely necessary to - * read + * @param necessaryLen the number of bytes that are absolutely necessary to read * @param extraLen the number of extra bytes that would be nice to read * @return true if succeeded reading the extra bytes * @throws IOException if failed to read the necessary bytes */ - public static boolean readWithExtra(InputStream in, byte[] buf, + static boolean readWithExtra(InputStream in, byte[] buf, int bufOffset, int necessaryLen, int extraLen) throws IOException { int bytesRemaining = necessaryLen + extraLen; while (bytesRemaining > 0) { @@ -723,7 +759,8 @@ public class HFileBlock implements Cacheable { } /** - * Read from an input stream. Analogous to + * Read from an input stream at least necessaryLen and if possible, + * extraLen also if available. Analogous to * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses * positional read and specifies a number of "extra" bytes that would be * desirable but not absolutely necessary to read. @@ -776,14 +813,13 @@ public class HFileBlock implements Cacheable { *

  • Construct an {@link HFileBlock.Writer}, providing a compression algorithm. *
  • Call {@link Writer#startWriting} and get a data stream to write to. *
  • Write your data into the stream. - *
  • Call {@link Writer#writeHeaderAndData(FSDataOutputStream)} as many times as you need to. + *
  • Call Writer#writeHeaderAndData(FSDataOutputStream) as many times as you need to. * store the serialized block into an external stream. *
  • Repeat to write more blocks. * *

    */ - public static class Writer { - + static class Writer { private enum State { INIT, WRITING, @@ -798,7 +834,7 @@ public class HFileBlock implements Cacheable { private HFileBlockEncodingContext dataBlockEncodingCtx; - /** block encoding context for non-data blocks */ + /** block encoding context for non-data blocks*/ private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx; /** @@ -871,25 +907,26 @@ public class HFileBlock implements Cacheable { * @param dataBlockEncoder data block encoding algorithm to use */ public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { - this.dataBlockEncoder = dataBlockEncoder != null - ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; - defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, - HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); - dataBlockEncodingCtx = this.dataBlockEncoder - .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); - if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + fileContext.getBytesPerChecksum()); } - + this.dataBlockEncoder = dataBlockEncoder != null? + dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; + this.dataBlockEncodingCtx = this.dataBlockEncoder. + newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); + // TODO: This should be lazily instantiated since we usually do NOT need this default encoder + this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, + HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); + // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum baosInMemory = new ByteArrayOutputStream(); - prevOffsetByType = new long[BlockType.values().length]; - for (int i = 0; i < prevOffsetByType.length; ++i) - prevOffsetByType[i] = -1; - + for (int i = 0; i < prevOffsetByType.length; ++i) { + prevOffsetByType[i] = UNSET; + } + // TODO: Why fileContext saved away when we have dataBlockEncoder and/or + // defaultDataBlockEncoder? this.fileContext = fileContext; } @@ -899,7 +936,7 @@ public class HFileBlock implements Cacheable { * @return the stream the user can write their data into * @throws IOException */ - public DataOutputStream startWriting(BlockType newBlockType) + DataOutputStream startWriting(BlockType newBlockType) throws IOException { if (state == State.BLOCK_READY && startOffset != -1) { // We had a previous block that was written to a stream at a specific @@ -929,10 +966,10 @@ public class HFileBlock implements Cacheable { * @param cell * @throws IOException */ - public void write(Cell cell) throws IOException{ + void write(Cell cell) throws IOException{ expectState(State.WRITING); - this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, - this.userDataStream); + this.unencodedDataSizeWritten += + this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream); } /** @@ -976,6 +1013,7 @@ public class HFileBlock implements Cacheable { } userDataStream.flush(); // This does an array copy, so it is safe to cache this byte array. + // Header is still the empty, 'dummy' header that is yet to be filled out. uncompressedBytesWithHeader = baosInMemory.toByteArray(); prevOffset = prevOffsetByType[blockType.getId()]; @@ -987,22 +1025,25 @@ public class HFileBlock implements Cacheable { onDiskBytesWithHeader = dataBlockEncodingCtx .compressAndEncrypt(uncompressedBytesWithHeader); } else { - onDiskBytesWithHeader = defaultBlockEncodingCtx - .compressAndEncrypt(uncompressedBytesWithHeader); + onDiskBytesWithHeader = this.defaultBlockEncodingCtx. + compressAndEncrypt(uncompressedBytesWithHeader); } + // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = (int) ChecksumUtil.numBytes( onDiskBytesWithHeader.length, fileContext.getBytesPerChecksum()); - // put the header for on disk bytes + // Put the header for the on disk bytes; header currently is unfilled-out putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length + numBytes, uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); - // set the header for the uncompressed bytes (for cache-on-write) - putHeader(uncompressedBytesWithHeader, 0, + // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from + // onDiskBytesWithHeader array. + if (onDiskBytesWithHeader != uncompressedBytesWithHeader) { + putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length + numBytes, uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length); - + } onDiskChecksum = new byte[numBytes]; ChecksumUtil.generateChecksums( onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length, @@ -1036,9 +1077,9 @@ public class HFileBlock implements Cacheable { * @param out * @throws IOException */ - public void writeHeaderAndData(FSDataOutputStream out) throws IOException { + void writeHeaderAndData(FSDataOutputStream out) throws IOException { long offset = out.getPos(); - if (startOffset != -1 && offset != startOffset) { + if (startOffset != UNSET && offset != startOffset) { throw new IOException("A " + blockType + " block written to a " + "stream twice, first at offset " + startOffset + ", then at " + offset); @@ -1091,7 +1132,7 @@ public class HFileBlock implements Cacheable { /** * Releases resources used by this writer. */ - public void release() { + void release() { if (dataBlockEncodingCtx != null) { dataBlockEncodingCtx.close(); dataBlockEncodingCtx = null; @@ -1112,9 +1153,8 @@ public class HFileBlock implements Cacheable { */ int getOnDiskSizeWithoutHeader() { expectState(State.BLOCK_READY); - return onDiskBytesWithHeader.length - + onDiskChecksum.length - - HConstants.HFILEBLOCK_HEADER_SIZE; + return onDiskBytesWithHeader.length + + onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE; } /** @@ -1146,7 +1186,7 @@ public class HFileBlock implements Cacheable { } /** @return true if a block is being written */ - public boolean isWriting() { + boolean isWriting() { return state == State.WRITING; } @@ -1157,7 +1197,7 @@ public class HFileBlock implements Cacheable { * * @return the number of bytes written */ - public int blockSizeWritten() { + int blockSizeWritten() { if (state != State.WRITING) return 0; return this.unencodedDataSizeWritten; } @@ -1205,7 +1245,7 @@ public class HFileBlock implements Cacheable { * @param out the file system output stream * @throws IOException */ - public void writeBlock(BlockWritable bw, FSDataOutputStream out) + void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { bw.writeToBlock(startWriting(bw.getBlockType())); writeHeaderAndData(out); @@ -1218,7 +1258,7 @@ public class HFileBlock implements Cacheable { * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * 0 value in bytesPerChecksum. */ - public HFileBlock getBlockForCaching(CacheConfig cacheConf) { + HFileBlock getBlockForCaching(CacheConfig cacheConf) { HFileContext newContext = new HFileContextBuilder() .withBlockSize(fileContext.getBlocksize()) .withBytesPerCheckSum(0) @@ -1241,7 +1281,7 @@ public class HFileBlock implements Cacheable { } /** Something that can be written into a block. */ - public interface BlockWritable { + interface BlockWritable { /** The type of block this data should use. */ BlockType getBlockType(); @@ -1258,7 +1298,7 @@ public class HFileBlock implements Cacheable { // Block readers and writers /** An interface allowing to iterate {@link HFileBlock}s. */ - public interface BlockIterator { + interface BlockIterator { /** * Get the next block, or null if there are no more blocks to iterate. @@ -1273,7 +1313,7 @@ public class HFileBlock implements Cacheable { } /** A full-fledged reader with iteration ability. */ - public interface FSReader { + interface FSReader { /** * Reads the block at the given offset in the file with the given on-disk @@ -1321,9 +1361,15 @@ public class HFileBlock implements Cacheable { long offset = -1; byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); + @Override + public String toString() { + return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header); + } } - /** Reads version 2 blocks from the filesystem. */ + /** + * Reads version 2 blocks from the filesystem. + */ static class FSReaderImpl implements FSReader { /** The file system stream of the underlying {@link HFile} that * does or doesn't do checksum validations in the filesystem */ @@ -1362,7 +1408,7 @@ public class HFileBlock implements Cacheable { // Cache the fileName protected String pathName; - public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, + FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, HFileContext fileContext) throws IOException { this.fileSize = fileSize; this.hfs = hfs; @@ -1420,8 +1466,8 @@ public class HFileBlock implements Cacheable { * the on-disk size of the next block, or -1 if it could not be determined. * * @param dest destination buffer - * @param destOffset offset in the destination buffer - * @param size size of the block to be read + * @param destOffset offset into the destination buffer at where to put the bytes we read + * @param size size of read * @param peekIntoNextBlock whether to read the next block's on-disk size * @param fileOffset position in the stream to read at * @param pread whether we should do a positional read @@ -1430,12 +1476,10 @@ public class HFileBlock implements Cacheable { * -1 if it could not be determined * @throws IOException */ - protected int readAtOffset(FSDataInputStream istream, - byte[] dest, int destOffset, int size, + protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, boolean peekIntoNextBlock, long fileOffset, boolean pread) - throws IOException { - if (peekIntoNextBlock && - destOffset + size + hdrSize > dest.length) { + throws IOException { + if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { // We are asked to read the next block's header as well, but there is // not enough room in the array. throw new IOException("Attempted to read " + size + " bytes and " + @@ -1522,7 +1566,7 @@ public class HFileBlock implements Cacheable { HFile.LOG.warn(msg); throw new IOException(msg); // cannot happen case here } - HFile.checksumFailures.increment(); // update metrics + HFile.CHECKSUM_FAILURES.increment(); // update metrics // If we have a checksum failure, we fall back into a mode where // the next few reads use HDFS level checksums. We aim to make the @@ -1597,6 +1641,7 @@ public class HFileBlock implements Cacheable { } int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL; + // See if we can avoid reading the header. This is desirable, because // we will not incur a backward seek operation if we have already // read this block's header as part of the previous read's look-ahead. @@ -1604,15 +1649,22 @@ public class HFileBlock implements Cacheable { // been read. // TODO: How often does this optimization fire? Has to be same thread so the thread local // is pertinent and we have to be reading next block as in a big scan. + ByteBuffer headerBuf = null; PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get(); - ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null; - + boolean preReadHeader = false; + if (prefetchedHeader != null && prefetchedHeader.offset == offset) { + headerBuf = prefetchedHeader.buf; + preReadHeader = true; + } // Allocate enough space to fit the next block's header too. int nextBlockOnDiskSize = 0; byte[] onDiskBlock = null; HFileBlock b = null; + boolean fastPath = false; + boolean readHdrOnly = false; if (onDiskSizeWithHeader > 0) { + fastPath = true; // We know the total on-disk size. Read the entire block into memory, // then parse the header. This code path is used when // doing a random read operation relying on the block index, as well as @@ -1669,6 +1721,7 @@ public class HFileBlock implements Cacheable { // Unfortunately, we still have to do a separate read operation to // read the header. if (headerBuf == null) { + readHdrOnly = true; // From the header, determine the on-disk size of the given hfile // block, and read the remaining data, thereby incurring two read // operations. This might happen when we are doing the first read @@ -1681,12 +1734,12 @@ public class HFileBlock implements Cacheable { } // TODO: FIX!!! Expensive parse just to get a length b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); + // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - // headerBuf is HBB + // headerBuf is HBB. Copy hdr into onDiskBlock System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); - nextBlockOnDiskSize = - readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - - hdrSize, true, offset + hdrSize, pread); + nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, + b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } @@ -1716,13 +1769,19 @@ public class HFileBlock implements Cacheable { b.offset = offset; b.fileContext.setIncludesTags(this.fileContext.isIncludesTags()); b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc()); + if (LOG.isTraceEnabled()) { + LOG.trace("Read preReadHeader=" + preReadHeader + ", fastPath=" + fastPath + + ", readHdrOnly=" + readHdrOnly + ", " + b); + } return b; } + @Override public void setIncludesMemstoreTS(boolean includesMemstoreTS) { this.fileContext.setIncludesMvcc(includesMemstoreTS); } + @Override public void setDataBlockEncoder(HFileDataBlockEncoder encoder) { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } @@ -1772,11 +1831,13 @@ public class HFileBlock implements Cacheable { @Override public void serialize(ByteBuffer destination) { - this.buf.get(destination, 0, getSerializedLength() - - EXTRA_SERIALIZATION_SPACE); + this.buf.get(destination, 0, getSerializedLength() - EXTRA_SERIALIZATION_SPACE); serializeExtraInfo(destination); } + /** + * Write out the content of EXTRA_SERIALIZATION_SPACE. Public so can be accessed by BucketCache. + */ public void serializeExtraInfo(ByteBuffer destination) { destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0); destination.putLong(this.offset); @@ -1862,7 +1923,7 @@ public class HFileBlock implements Cacheable { } /** - * Calcuate the number of bytes required to store all the checksums + * Calculate the number of bytes required to store all the checksums * for this block. Each checksum value is a 4 byte integer. */ int totalChecksumBytes() { @@ -1888,16 +1949,14 @@ public class HFileBlock implements Cacheable { * Maps a minor version to the size of the header. */ public static int headerSize(boolean usesHBaseChecksum) { - if (usesHBaseChecksum) { - return HConstants.HFILEBLOCK_HEADER_SIZE; - } - return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; + return usesHBaseChecksum? + HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; } /** * Return the appropriate DUMMY_HEADER for the minor version */ - public byte[] getDummyHeaderForVersion() { + byte[] getDummyHeaderForVersion() { return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum()); } @@ -1905,17 +1964,14 @@ public class HFileBlock implements Cacheable { * Return the appropriate DUMMY_HEADER for the minor version */ static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) { - if (usesHBaseChecksum) { - return HConstants.HFILEBLOCK_DUMMY_HEADER; - } - return DUMMY_HEADER_NO_CHECKSUM; + return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM; } /** * @return the HFileContext used to create this HFileBlock. Not necessary the * fileContext for the file from which this block's data was originally read. */ - public HFileContext getHFileContext() { + HFileContext getHFileContext() { return this.fileContext; } @@ -1927,7 +1983,7 @@ public class HFileBlock implements Cacheable { /** * @return true if this block is backed by a shared memory area(such as that of a BucketCache). */ - public boolean usesSharedMemory() { + boolean usesSharedMemory() { return this.memType == MemoryType.SHARED; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 239c63d001e..331b8ba918c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1491,13 +1491,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("From Cache " + cachedBlock); + } if (Trace.isTracing()) { traceScope.getSpan().addTimelineAnnotation("blockCacheHit"); } assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { - HFile.dataBlockReadCnt.increment(); + HFile.DATABLOCK_READ_COUNT.increment(); } // Validate encoding type for data blocks. We include encoding // type in the cache key, and we expect it to match on a cache hit. @@ -1537,7 +1540,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { - HFile.dataBlockReadCnt.increment(); + HFile.DATABLOCK_READ_COUNT.increment(); } return unpacked; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index 1d04467e79b..c67bdd4976b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.Cell; * getValue. */ @InterfaceAudience.Private -public interface HFileScanner extends Shipper { +public interface HFileScanner extends Shipper, Closeable { /** * SeekTo or just before the passed cell. Examine the return * code to figure whether we found the cell or not. @@ -154,4 +155,4 @@ public interface HFileScanner extends Shipper { * Close this HFile scanner and do necessary cleanup. */ void close(); -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 186d86bb790..d310d136f3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -57,6 +57,8 @@ import org.apache.hadoop.io.Writable; public class HFileWriterImpl implements HFile.Writer { private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class); + private static final long UNSET = -1; + /** The Cell previously appended. Becomes the last cell in the file.*/ protected Cell lastCell = null; @@ -129,16 +131,16 @@ public class HFileWriterImpl implements HFile.Writer { private List inlineBlockWriters = new ArrayList(); /** block writer */ - protected HFileBlock.Writer fsBlockWriter; + protected HFileBlock.Writer blockWriter; private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; /** The offset of the first data block or -1 if the file is empty. */ - private long firstDataBlockOffset = -1; + private long firstDataBlockOffset = UNSET; /** The offset of the last data block or 0 if the file is empty. */ - protected long lastDataBlockOffset; + protected long lastDataBlockOffset = UNSET; /** * The last(stop) Cell of the previous data block. @@ -164,8 +166,7 @@ public class HFileWriterImpl implements HFile.Writer { } else { this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; } - this.comparator = comparator != null ? comparator - : CellComparator.COMPARATOR; + this.comparator = comparator != null? comparator: CellComparator.COMPARATOR; closeOutputStream = path != null; this.cacheConf = cacheConf; @@ -273,15 +274,15 @@ public class HFileWriterImpl implements HFile.Writer { /** Additional initialization steps */ protected void finishInit(final Configuration conf) { - if (fsBlockWriter != null) { + if (blockWriter != null) { throw new IllegalStateException("finishInit called twice"); } - fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); + blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); - dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, + dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null); dataBlockIndexWriter.setMaxChunkSize( @@ -299,29 +300,29 @@ public class HFileWriterImpl implements HFile.Writer { * @throws IOException */ protected void checkBlockBoundary() throws IOException { - if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return; + if (blockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return; finishBlock(); writeInlineBlocks(false); newBlock(); } - /** Clean up the current data block */ + /** Clean up the data block that is currently being written.*/ private void finishBlock() throws IOException { - if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; + if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) return; - // Update the first data block offset for scanning. - if (firstDataBlockOffset == -1) { + // Update the first data block offset if UNSET; used scanning. + if (firstDataBlockOffset == UNSET) { firstDataBlockOffset = outputStream.getPos(); } - // Update the last data block offset + // Update the last data block offset each time through here. lastDataBlockOffset = outputStream.getPos(); - fsBlockWriter.writeHeaderAndData(outputStream); - int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader(); + blockWriter.writeHeaderAndData(outputStream); + int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); Cell indexEntry = getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock); dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), lastDataBlockOffset, onDiskSize); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } @@ -461,12 +462,12 @@ public class HFileWriterImpl implements HFile.Writer { while (ibw.shouldWriteBlock(closing)) { long offset = outputStream.getPos(); boolean cacheThisBlock = ibw.getCacheOnWrite(); - ibw.writeInlineBlock(fsBlockWriter.startWriting( + ibw.writeInlineBlock(blockWriter.startWriting( ibw.getInlineBlockType())); - fsBlockWriter.writeHeaderAndData(outputStream); - ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(), - fsBlockWriter.getUncompressedSizeWithoutHeader()); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + blockWriter.writeHeaderAndData(outputStream); + ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), + blockWriter.getUncompressedSizeWithoutHeader()); + totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); if (cacheThisBlock) { doCacheOnWrite(offset); @@ -481,7 +482,7 @@ public class HFileWriterImpl implements HFile.Writer { * the cache key. */ private void doCacheOnWrite(long offset) { - HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); + HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock); } @@ -492,7 +493,7 @@ public class HFileWriterImpl implements HFile.Writer { */ protected void newBlock() throws IOException { // This is where the next block begins. - fsBlockWriter.startWriting(BlockType.DATA); + blockWriter.startWriting(BlockType.DATA); firstCellInBlock = null; if (lastCell != null) { lastCellOfPreviousBlock = lastCell; @@ -547,15 +548,15 @@ public class HFileWriterImpl implements HFile.Writer { // store the beginning offset long offset = outputStream.getPos(); // write the metadata content - DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META); + DataOutputStream dos = blockWriter.startWriting(BlockType.META); metaData.get(i).write(dos); - fsBlockWriter.writeHeaderAndData(outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + blockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); // Add the new meta block to the meta index. metaBlockIndexWriter.addEntry(metaNames.get(i), offset, - fsBlockWriter.getOnDiskSizeWithHeader()); + blockWriter.getOnDiskSizeWithHeader()); } } @@ -572,10 +573,10 @@ public class HFileWriterImpl implements HFile.Writer { trailer.setLoadOnOpenOffset(rootIndexOffset); // Meta block index. - metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting( + metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting( BlockType.ROOT_INDEX), "meta"); - fsBlockWriter.writeHeaderAndData(outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + blockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); if (this.hFileContext.isIncludesMvcc()) { appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); @@ -583,14 +584,14 @@ public class HFileWriterImpl implements HFile.Writer { } // File info - writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO)); - fsBlockWriter.writeHeaderAndData(outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); + blockWriter.writeHeaderAndData(outputStream); + totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); // Load-on-open data supplied by higher levels, e.g. Bloom filters. for (BlockWritable w : additionalLoadOnOpenData){ - fsBlockWriter.writeBlock(w, outputStream); - totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + blockWriter.writeBlock(w, outputStream); + totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); } // Now finish off the trailer. @@ -605,7 +606,7 @@ public class HFileWriterImpl implements HFile.Writer { finishClose(trailer); - fsBlockWriter.release(); + blockWriter.release(); } @Override @@ -670,11 +671,11 @@ public class HFileWriterImpl implements HFile.Writer { checkBlockBoundary(); } - if (!fsBlockWriter.isWriting()) { + if (!blockWriter.isWriting()) { newBlock(); } - fsBlockWriter.write(cell); + blockWriter.write(cell); totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell); totalValueLength += cell.getValueLength(); @@ -686,7 +687,7 @@ public class HFileWriterImpl implements HFile.Writer { firstCellInBlock = cell; } - // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly? + // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? lastCell = cell; entryCount++; this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 1e8cb7dbb44..b5cd0c35e4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -269,9 +269,10 @@ public final class BucketAllocator { } } - // Default block size is 64K, so we choose more sizes near 64K, you'd better + // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better // reset it according to your cluster's block size distribution // TODO Support the view of block size distribution statistics + // TODO: Why we add the extra 1024 bytes? Slop? private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024, 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024, @@ -289,6 +290,9 @@ public final class BucketAllocator { return null; } + /** + * So, what is the minimum amount of items we'll tolerate in a single bucket? + */ static public final int FEWEST_ITEMS_IN_BUCKET = 4; private final int[] bucketSizes; @@ -308,9 +312,8 @@ public final class BucketAllocator { this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize; buckets = new Bucket[(int) (availableSpace / bucketCapacity)]; if (buckets.length < this.bucketSizes.length) - throw new BucketAllocatorException( - "Bucket allocator size too small - must have room for at least " - + this.bucketSizes.length + " buckets"); + throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length + + "); must have room for at least " + this.bucketSizes.length + " buckets"); bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length]; for (int i = 0; i < this.bucketSizes.length; ++i) { bucketSizeInfos[i] = new BucketSizeInfo(i); @@ -321,6 +324,12 @@ public final class BucketAllocator { .instantiateBucket(buckets[i]); } this.totalSize = ((long) buckets.length) * bucketCapacity; + if (LOG.isInfoEnabled()) { + LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length + + ", bucket capacity=" + this.bucketCapacity + + "=(" + FEWEST_ITEMS_IN_BUCKET + "*" + this.bigItemSize + ")=" + + "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))"); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 7436b71b3e3..66aced00b64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -78,7 +78,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses - * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to + * BucketCache#ramCache and BucketCache#backingMap in order to * determine if a given element is in the cache. The bucket cache can use on-heap or * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to * store/read the block data. @@ -87,7 +87,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} * *

    BucketCache can be used as mainly a block cache (see - * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with + * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with * LruBlockCache to decrease CMS GC and heap fragmentation. * *

    It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store @@ -349,6 +349,7 @@ public class BucketCache implements BlockCache, HeapSize { */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { + if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem); if (!cacheEnabled) { return; } @@ -422,6 +423,9 @@ public class BucketCache implements BlockCache, HeapSize { // TODO : change this area - should be removed after server cells and // 12295 are available int len = bucketEntry.getLength(); + if (LOG.isTraceEnabled()) { + LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); + } Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, bucketEntry.deserializerReference(this.deserialiserMap)); long timeTaken = System.nanoTime() - start; @@ -628,7 +632,9 @@ public class BucketCache implements BlockCache, HeapSize { */ private void freeSpace(final String why) { // Ensure only one freeSpace progress at a time - if (!freeSpaceLock.tryLock()) return; + if (!freeSpaceLock.tryLock()) { + return; + } try { freeInProgress = true; long bytesToFreeWithoutExtra = 0; @@ -657,7 +663,7 @@ public class BucketCache implements BlockCache, HeapSize { return; } long currentSize = bucketAllocator.getUsedSize(); - long totalSize=bucketAllocator.getTotalSize(); + long totalSize = bucketAllocator.getTotalSize(); if (LOG.isDebugEnabled() && msgBuffer != null) { LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + @@ -864,7 +870,7 @@ public class BucketCache implements BlockCache, HeapSize { } } - // Make sure data pages are written are on media before we update maps. + // Make sure data pages are written on media before we update maps. try { ioEngine.sync(); } catch (IOException ioex) { @@ -938,9 +944,9 @@ public class BucketCache implements BlockCache, HeapSize { FileOutputStream fos = null; ObjectOutputStream oos = null; try { - if (!ioEngine.isPersistent()) - throw new IOException( - "Attempt to persist non-persistent cache mappings!"); + if (!ioEngine.isPersistent()) { + throw new IOException("Attempt to persist non-persistent cache mappings!"); + } fos = new FileOutputStream(persistencePath, false); oos = new ObjectOutputStream(fos); oos.writeLong(cacheCapacity); @@ -1020,19 +1026,17 @@ public class BucketCache implements BlockCache, HeapSize { } /** - * Used to shut down the cache -or- turn it off in the case of something - * broken. + * Used to shut down the cache -or- turn it off in the case of something broken. */ private void disableCache() { - if (!cacheEnabled) - return; + if (!cacheEnabled) return; cacheEnabled = false; ioEngine.shutdown(); this.scheduleThreadPool.shutdown(); - for (int i = 0; i < writerThreads.length; ++i) - writerThreads[i].interrupt(); + for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt(); this.ramCache.clear(); if (!ioEngine.isPersistent() || persistencePath == null) { + // If persistent ioengine and a path, we will serialize out the backingMap. this.backingMap.clear(); } } @@ -1327,6 +1331,9 @@ public class BucketCache implements BlockCache, HeapSize { len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE; ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); block.serializeExtraInfo(extraInfoBuffer); + if (LOG.isTraceEnabled()) { + LOG.trace("Write offset=" + offset + ", len=" + len); + } ioEngine.write(sliceBuf, offset); ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java index 1ef918c9e05..66fee6afd03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java @@ -32,6 +32,7 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.compress.Compression; @@ -80,6 +80,7 @@ public class TestDataBlockEncoders { private static int ENCODED_DATA_OFFSET = HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE; + static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; private RedundantKVGenerator generator = new RedundantKVGenerator(); private Random randomizer = new Random(42l); @@ -109,11 +110,9 @@ public class TestDataBlockEncoders { .withIncludesTags(includesTags) .withCompression(algo).build(); if (encoder != null) { - return encoder.newDataBlockEncodingContext(encoding, - HConstants.HFILEBLOCK_DUMMY_HEADER, meta); + return encoder.newDataBlockEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta); } else { - return new HFileBlockDefaultEncodingContext(encoding, - HConstants.HFILEBLOCK_DUMMY_HEADER, meta); + return new HFileBlockDefaultEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta); } } @@ -249,7 +248,7 @@ public class TestDataBlockEncoders { HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException { DataBlockEncoder encoder = encoding.getEncoder(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); + baos.write(HFILEBLOCK_DUMMY_HEADER); DataOutputStream dos = new DataOutputStream(baos); encoder.startBlockEncoding(encodingContext, dos); for (KeyValue kv : kvs) { @@ -386,10 +385,10 @@ public class TestDataBlockEncoders { continue; } HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding, - HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext); + HFILEBLOCK_DUMMY_HEADER, fileContext); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); + baos.write(HFILEBLOCK_DUMMY_HEADER); DataOutputStream dos = new DataOutputStream(baos); encoder.startBlockEncoding(encodingContext, dos); for (KeyValue kv : kvList) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java index 783f58e8b10..21941f77237 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java @@ -46,7 +46,7 @@ import org.junit.runners.Parameterized.Parameters; @Category({IOTests.class, SmallTests.class}) @RunWith(Parameterized.class) public class TestSeekToBlockWithEncoders { - + static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; private final boolean useOffheapData; @Parameters @@ -281,7 +281,7 @@ public class TestSeekToBlockWithEncoders { .withIncludesMvcc(false).withIncludesTags(false) .withCompression(Compression.Algorithm.NONE).build(); HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding, - HConstants.HFILEBLOCK_DUMMY_HEADER, meta); + HFILEBLOCK_DUMMY_HEADER, meta); ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs, encodingContext, this.useOffheapData); DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index e8a2882ab9b..91ab8c07d7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -42,12 +42,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ChecksumType; import org.junit.Before; import org.junit.Test; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java index 16353107794..68dc6259562 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java @@ -97,7 +97,7 @@ public class TestForceCacheImportantBlocks { public void setup() { // Make sure we make a new one each time. CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; - HFile.dataBlockReadCnt.set(0); + HFile.DATABLOCK_READ_COUNT.set(0); } @Test @@ -114,12 +114,12 @@ public class TestForceCacheImportantBlocks { CacheStats stats = cache.getStats(); writeTestData(region); assertEquals(0, stats.getHitCount()); - assertEquals(0, HFile.dataBlockReadCnt.get()); + assertEquals(0, HFile.DATABLOCK_READ_COUNT.get()); // Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss // count should go up. Otherwise, all should be cached and the miss count should not rise. region.get(new Get(Bytes.toBytes("row" + 0))); assertTrue(stats.getHitCount() > 0); - assertTrue(HFile.dataBlockReadCnt.get() > 0); + assertTrue(HFile.DATABLOCK_READ_COUNT.get() > 0); long missCount = stats.getMissCount(); region.get(new Get(Bytes.toBytes("row" + 0))); if (this.cfCacheEnabled) assertEquals(missCount, stats.getMissCount()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java new file mode 100644 index 00000000000..5c2e7d607ee --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java @@ -0,0 +1,231 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +/** + * Test for file-backed BucketCache. + */ +@Category({IOTests.class, SmallTests.class}) +public class TestHFileBackedByBucketCache { + private static final Log LOG = LogFactory.getLog(TestHFileBackedByBucketCache.class); + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final int ROW_LENGTH = 4; + private Configuration conf; + private FileSystem fs; + + // MATH! SIZING FOR THE TEST! + // Set bucketcache to be smallest size possible which is 1MB. We do that in the test + // @Before before method. Into out 1MB cache, have it so only one bucket. If + // bucketsize is set to 125kb in size, we will have one bucket in our 1MB bucketcache. It is + // cryptic how this comes about but basically comes down to + // {@link BucketAllocator#FEWEST_ITEMS_IN_BUCKET} being '4'... so 4 * 125 = just over 500k or so + // which makes for one bucket only in 1M which you can see from TRACE logging: + // + // Cache totalSize=532480, buckets=1, bucket capacity= + // 532480=(4*133120)=(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size)) + // + // Now into this one big bucket, we write hfileblocks....Each hfileblock has two keys because + // first is under the BLOCKSIZE of 64k and then the second puts us over the 64k... + // so two Cells per block... + /** + * Default size. + */ + private static final int BLOCKSIZE = 64 * 1024; + + /** + * Bucket sizes get multiplied by 4 for actual bucket size. + * See {@link BucketAllocator#FEWEST_ITEMS_IN_BUCKET}. + */ + private static final int BUCKETSIZE = 125 * 1024; + + /** + * Make it so one Cell is just under a BLOCKSIZE. The second Cell puts us over the BLOCKSIZE + * so we have two Cells per HFilBlock. + */ + private static final int VALUE_SIZE = 33 * 1024; + + @Before + public void before() throws IOException { + // Do setup of a bucketcache that has one bucket only. Enable trace-level logging for + // key classes. + this.conf = TEST_UTIL.getConfiguration(); + this.fs = FileSystem.get(conf); + + // Set BucketCache and HFileBlock to log at trace level. + setTraceLevel(BucketCache.class); + setTraceLevel(HFileBlock.class); + setTraceLevel(HFileReaderImpl.class); + setTraceLevel(BucketAllocator.class); + } + + // Assumes log4j logging. + private static void setTraceLevel(final Class clazz) { + Log testlog = LogFactory.getLog(clazz.getName()); + ((org.apache.commons.logging.impl.Log4JLogger)testlog).getLogger(). + setLevel(org.apache.log4j.Level.TRACE); + } + + /** + * Test that bucketcache is caching and that the persist of in-memory map works + * @throws IOException + */ + @Test + public void testBucketCacheCachesAndPersists() throws IOException { + // Set up a bucket cache. Set up one that will persist by passing a + // hbase.bucketcache.persistent.path value to store the in-memory map of what is out in + // the file-backed bucketcache. Set bucketcache to have one size only, BUCKETSIZE. + // See "MATH! SIZING FOR THE TEST!" note above around declaration of BUCKETSIZE + String bucketCacheDataFile = + (new Path(TEST_UTIL.getDataTestDir(), "bucketcache.data")).toString(); + (new File(bucketCacheDataFile)).getParentFile().mkdirs(); + this.conf.set("hbase.bucketcache.ioengine", "file:" + bucketCacheDataFile); + this.conf.set("hbase.bucketcache.persistent.path", bucketCacheDataFile + ".map"); + this.conf.setStrings("hbase.bucketcache.bucket.sizes", Integer.toString(BUCKETSIZE)); + // This is minimum bucketcache size.... 1MB. + this.conf.setInt("hbase.bucketcache.size", 1); + CacheConfig cacheConfig = new CacheConfig(conf); + Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), this.name.getMethodName()); + // Write 8 entries which should make for four hfileBlocks. + final int count = 8; + final int hfileBlockCount = 4; + List writtenCells = writeFile(hfilePath, Compression.Algorithm.NONE, cacheConfig, count); + CacheStats stats = cacheConfig.getBlockCache().getStats(); + List readCells = readFile(hfilePath, cacheConfig); + assertTrue(!writtenCells.isEmpty()); + assertEquals(writtenCells.size(), readCells.size()); + assertEquals(hfileBlockCount, stats.getMissCount()); + assertEquals(1, stats.getHitCount()); // readFile will read first block is from cache. + + // Now, close out the cache and then reopen and verify that cache still has our blocks. + // Assert that persistence works. + cacheConfig.getBlockCache().shutdown(); + // Need to clear the global cache else the new CacheConfig won't create a bucketcache but + // just reuse the old one. + CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null; + cacheConfig = new CacheConfig(conf); + stats = cacheConfig.getBlockCache().getStats(); + assertEquals(0, stats.getHitCachingCount()); + readCells = readFile(hfilePath, cacheConfig); + // readFile will read all hfileblocs in the file, hfileBlockCount, and then one more, so + 1. + assertEquals(hfileBlockCount + 1, stats.getHitCachingCount()); + } + + /** + * Write a file with count entries. + * @return The Cells written to the file. + * @throws IOException + */ + private List writeFile(final Path hfilePath, final Compression.Algorithm compressAlgo, + final CacheConfig cacheConfig, final int count) + throws IOException { + List cells = new ArrayList(count); + HFileContext context = + new HFileContextBuilder().withBlockSize(BLOCKSIZE).withCompression(compressAlgo).build(); + try (HFile.Writer writer = new HFile.WriterFactory(conf, cacheConfig). + withPath(fs, hfilePath). + withFileContext(context). + withComparator(CellComparator.COMPARATOR). + create()) { + byte [] valueBytes = new byte [VALUE_SIZE]; + for (int i = 0; i < valueBytes.length; i++) valueBytes[i] = '0'; + for (int i = 0; i < count; ++i) { + byte[] keyBytes = format(i); + KeyValue keyValue = new KeyValue(keyBytes, HConstants.CATALOG_FAMILY, keyBytes, + HConstants.LATEST_TIMESTAMP, valueBytes); + writer.append(keyValue); + cells.add(keyValue); + } + } + return cells; + } + + /** + * Read the whole file, then read the first block so we get something from cache for sure. + * So... there are TOTAL_BLOCKS_IN_FILE read + 1. See math at head of this class. + * @return The Cells read from the file. + */ + private List readFile(final Path hfilePath, final CacheConfig cacheConfig) + throws IOException { + List cells = new ArrayList(); + try (HFile.Reader reader = HFile.createReader(this.fs, hfilePath, cacheConfig, this.conf); + HFileScanner scanner = reader.getScanner(true, true)) { + scanner.seekTo(); + do { + cells.add(scanner.getCell()); + LOG.info(scanner.getKey()); + } while (scanner.next()); + // Do a random seek just so we see a block coming from cache. + scanner.seekTo(reader.getFirstKey()); + scanner.next(); + LOG.info(scanner.getCell()); + } + return cells; + } + + /* + * Format passed integer. + * @param number + * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed + * number (Does absolute in case number is negative). + */ + private static byte [] format(final int number) { + byte [] b = new byte[ROW_LENGTH]; + int d = Math.abs(number); + for (int i = b.length - 1; i >= 0; i--) { + b[i] = (byte)((d % 10) + '0'); + d /= 10; + } + return b; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 4ee7f5bb825..6748efc0947 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -553,7 +553,7 @@ public class TestHFileBlock { for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { if (!pread) { assertEquals(is.getPos(), curOffset + (i == 0 ? 0 : - HConstants.HFILEBLOCK_HEADER_SIZE)); + HConstants.HFILEBLOCK_HEADER_SIZE)); } assertEquals(expectedOffsets.get(i).longValue(), curOffset); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index 3cfee026381..16607b9748c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -347,8 +347,7 @@ public class TestHFileBlockCompatibility { // These constants are as they were in minorVersion 0. private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER; - private static final byte[] DUMMY_HEADER = - HFileBlock.DUMMY_HEADER_NO_CHECKSUM; + private static final byte[] DUMMY_HEADER = HFileBlock.DUMMY_HEADER_NO_CHECKSUM; private enum State { INIT, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java index 2dfbee6b850..a574d25518b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksRead.java @@ -196,7 +196,7 @@ public class TestBlocksRead { } private static long getBlkAccessCount(byte[] cf) { - return HFile.dataBlockReadCnt.get(); + return HFile.DATABLOCK_READ_COUNT.get(); } private static long getBlkCount() {