diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index a02d89a4bad..0c6244f720f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -65,7 +65,9 @@ public final class HConstants {
public static final byte[] RPC_HEADER = new byte[] { 'H', 'B', 'a', 's' };
public static final byte RPC_CURRENT_VERSION = 0;
- // HFileBlock constants.
+ // HFileBlock constants. TODO!!!! THESE DEFINES BELONG IN HFILEBLOCK, NOT UP HERE.
+ // Needed down in hbase-common though by encoders but these encoders should not be dealing
+ // in the internals of hfileblocks. Fix encapsulation.
/** The size data structures with minor version is 0 */
public static final int HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 817b1a79476..d873f7e7770 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -23,12 +23,12 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.ByteBufferedCell;
+import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SettableSequenceId;
@@ -52,7 +52,9 @@ import org.apache.hadoop.io.WritableUtils;
*/
@InterfaceAudience.Private
abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
-
+ /**
+ * TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
+ */
private static int INITIAL_KEY_BUFFER_SIZE = 512;
@Override
@@ -1140,8 +1142,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
.getEncodingState();
// Write the unencodedDataSizeWritten (with header size)
- Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
- + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
+ Bytes.putInt(uncompressedBytesWithHeader,
+ HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
);
if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
encodingCtx.postEncoding(BlockType.ENCODED_DATA);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index 397855a7edc..4adb2121c9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -39,7 +39,8 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
*/
@InterfaceAudience.Private
public interface DataBlockEncoder {
-
+// TODO: This Interface should be deprecated and replaced. It presumes hfile and carnal knowledge of
+// Cell internals. It was done for a different time. Remove. Purge.
/**
* Starts encoding for a block of KeyValues. Call
* {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
index 99451464e17..909391aa535 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
@@ -218,22 +218,22 @@ public class HFileContext implements HeapSize, Cloneable {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("HFileContext [");
- sb.append(" usesHBaseChecksum="); sb.append(usesHBaseChecksum);
- sb.append(" checksumType="); sb.append(checksumType);
- sb.append(" bytesPerChecksum="); sb.append(bytesPerChecksum);
- sb.append(" blocksize="); sb.append(blocksize);
- sb.append(" encoding="); sb.append(encoding);
- sb.append(" includesMvcc="); sb.append(includesMvcc);
- sb.append(" includesTags="); sb.append(includesTags);
- sb.append(" compressAlgo="); sb.append(compressAlgo);
- sb.append(" compressTags="); sb.append(compressTags);
- sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]");
+ sb.append("[");
+ sb.append("usesHBaseChecksum="); sb.append(usesHBaseChecksum);
+ sb.append(", checksumType="); sb.append(checksumType);
+ sb.append(", bytesPerChecksum="); sb.append(bytesPerChecksum);
+ sb.append(", blocksize="); sb.append(blocksize);
+ sb.append(", encoding="); sb.append(encoding);
+ sb.append(", includesMvcc="); sb.append(includesMvcc);
+ sb.append(", includesTags="); sb.append(includesTags);
+ sb.append(", compressAlgo="); sb.append(compressAlgo);
+ sb.append(", compressTags="); sb.append(compressTags);
+ sb.append(", cryptoContext=["); sb.append(cryptoContext); sb.append("]");
if (hfileName != null) {
- sb.append(" name=");
+ sb.append(", name=");
sb.append(hfileName);
}
- sb.append(" ]");
+ sb.append("]");
return sb.toString();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index d6bdec0bb54..6fe3927107c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -82,8 +82,14 @@ public class CacheConfig {
*/
/**
- * If the chosen ioengine can persist its state across restarts, the path to the file to
- * persist to.
+ * If the chosen ioengine can persist its state across restarts, the path to the file to persist
+ * to. This file is NOT the data file. It is a file into which we will serialize the map of
+ * what is in the data file. For example, if you pass the following argument as
+ * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
+ * file:/tmp/bucketcache.data
, then we will write the bucketcache data to the file
+ * /tmp/bucketcache.data
but the metadata on where the data is in the supplied file
+ * is an in-memory map that needs to be persisted across restarts. Where to store this
+ * in-memory state is what you supply here: e.g. /tmp/bucketcache.map
.
*/
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
"hbase.bucketcache.persistent.path";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index e9fa05ceee9..8582dbe9f2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -178,19 +178,20 @@ public class HFile {
* The number of bytes per checksum.
*/
public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
- // For measuring number of checksum failures
- static final Counter checksumFailures = new Counter();
- // for test purpose
- public static final Counter dataBlockReadCnt = new Counter();
+ // For measuring number of checksum failures
+ static final Counter CHECKSUM_FAILURES = new Counter();
+
+ // For tests. Gets incremented when we read a block whether from HDFS or from Cache.
+ public static final Counter DATABLOCK_READ_COUNT = new Counter();
/**
* Number of checksum verification failures. It also
* clears the counter.
*/
public static final long getChecksumFailuresCount() {
- long count = checksumFailures.get();
- checksumFailures.set(0);
+ long count = CHECKSUM_FAILURES.get();
+ CHECKSUM_FAILURES.set(0);
return count;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index e2f524c9e24..6268f2eb405 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -54,87 +56,121 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
- * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
- *
DATABLK*
+ * Be aware that when we read from HDFS, we overread pulling in the next blocks' header too.
+ * We do this to save having to do two seeks to read an HFileBlock; a seek to read the header
+ * to figure lengths, etc., and then another seek to pull in the data.
*/
@InterfaceAudience.Private
public class HFileBlock implements Cacheable {
+ private static final Log LOG = LogFactory.getLog(HFileBlock.class);
/**
- * On a checksum failure on a Reader, these many suceeding read
- * requests switch back to using hdfs checksums before auto-reenabling
- * hbase checksum verification.
+ * On a checksum failure, do these many succeeding read requests using hdfs checksums before
+ * auto-reenabling hbase checksum verification.
*/
static final int CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD = 3;
+ private static int UNSET = -1;
public static final boolean FILL_HEADER = true;
public static final boolean DONT_FILL_HEADER = false;
- /**
- * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
- * This extends normal header by adding the id of encoder.
- */
- public static final int ENCODED_HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE
- + DataBlockEncoding.ID_SIZE;
-
- static final byte[] DUMMY_HEADER_NO_CHECKSUM =
- new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
-
// How to get the estimate correctly? if it is a singleBB?
public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
(int)ClassSize.estimateBase(MultiByteBuff.class, false);
- // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
- public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
- + Bytes.SIZEOF_LONG;
+ /**
+ * See #blockDeserializer method for more info.
+ * 13 bytes of extra stuff stuck on the end of the HFileBlock that we pull in from HDFS (note,
+ * when we read from HDFS, we pull in an HFileBlock AND the header of the next block if one).
+ * The 13 bytes are: usesHBaseChecksum (1 byte) + offset of this block (long) +
+ * nextBlockOnDiskSizeWithHeader (int).
+ */
+ public static final int EXTRA_SERIALIZATION_SPACE =
+ Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
*/
static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
+ static final byte[] DUMMY_HEADER_NO_CHECKSUM =
+ new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
+
+ /**
+ * Used deserializing blocks from Cache.
+ *
+ * Serializing to cache is a little hard to follow. See Writer#finishBlock for where it is done.
+ * When we start to append to a new HFileBlock,
+ * we skip over where the header should go before we start adding Cells. When the block is
+ * done, we'll then go back and fill in the header and the checksum tail. Be aware that what
+ * gets serialized into the blockcache is a byte array that contains an HFileBlock followed by
+ * its checksums and then the header of the next HFileBlock (needed to help navigate), followed
+ * again by an extra 13 bytes of meta info needed when time to recreate the HFileBlock from cache.
+ *
+ * ++++++++++++++
+ * + HFileBlock +
+ * ++++++++++++++
+ * + Checksums +
+ * ++++++++++++++
+ * + NextHeader +
+ * ++++++++++++++
+ * + ExtraMeta! +
+ * ++++++++++++++
+ *
+ * TODO: Fix it so we do NOT put the NextHeader into blockcache. It is not necessary.
+ */
static final CacheableDeserializer
*/
- public static class Writer {
-
+ static class Writer {
private enum State {
INIT,
WRITING,
@@ -798,7 +834,7 @@ public class HFileBlock implements Cacheable {
private HFileBlockEncodingContext dataBlockEncodingCtx;
- /** block encoding context for non-data blocks */
+ /** block encoding context for non-data blocks*/
private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
/**
@@ -871,25 +907,26 @@ public class HFileBlock implements Cacheable {
* @param dataBlockEncoder data block encoding algorithm to use
*/
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
- this.dataBlockEncoder = dataBlockEncoder != null
- ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
- defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
- HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
- dataBlockEncodingCtx = this.dataBlockEncoder
- .newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
-
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
fileContext.getBytesPerChecksum());
}
-
+ this.dataBlockEncoder = dataBlockEncoder != null?
+ dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
+ this.dataBlockEncodingCtx = this.dataBlockEncoder.
+ newDataBlockEncodingContext(HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
+ // TODO: This should be lazily instantiated since we usually do NOT need this default encoder
+ this.defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null,
+ HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
+ // TODO: Set BAOS initial size. Use fileContext.getBlocksize() and add for header/checksum
baosInMemory = new ByteArrayOutputStream();
-
prevOffsetByType = new long[BlockType.values().length];
- for (int i = 0; i < prevOffsetByType.length; ++i)
- prevOffsetByType[i] = -1;
-
+ for (int i = 0; i < prevOffsetByType.length; ++i) {
+ prevOffsetByType[i] = UNSET;
+ }
+ // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
+ // defaultDataBlockEncoder?
this.fileContext = fileContext;
}
@@ -899,7 +936,7 @@ public class HFileBlock implements Cacheable {
* @return the stream the user can write their data into
* @throws IOException
*/
- public DataOutputStream startWriting(BlockType newBlockType)
+ DataOutputStream startWriting(BlockType newBlockType)
throws IOException {
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
@@ -929,10 +966,10 @@ public class HFileBlock implements Cacheable {
* @param cell
* @throws IOException
*/
- public void write(Cell cell) throws IOException{
+ void write(Cell cell) throws IOException{
expectState(State.WRITING);
- this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx,
- this.userDataStream);
+ this.unencodedDataSizeWritten +=
+ this.dataBlockEncoder.encode(cell, dataBlockEncodingCtx, this.userDataStream);
}
/**
@@ -976,6 +1013,7 @@ public class HFileBlock implements Cacheable {
}
userDataStream.flush();
// This does an array copy, so it is safe to cache this byte array.
+ // Header is still the empty, 'dummy' header that is yet to be filled out.
uncompressedBytesWithHeader = baosInMemory.toByteArray();
prevOffset = prevOffsetByType[blockType.getId()];
@@ -987,22 +1025,25 @@ public class HFileBlock implements Cacheable {
onDiskBytesWithHeader = dataBlockEncodingCtx
.compressAndEncrypt(uncompressedBytesWithHeader);
} else {
- onDiskBytesWithHeader = defaultBlockEncodingCtx
- .compressAndEncrypt(uncompressedBytesWithHeader);
+ onDiskBytesWithHeader = this.defaultBlockEncodingCtx.
+ compressAndEncrypt(uncompressedBytesWithHeader);
}
+ // Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBytesWithHeader.length,
fileContext.getBytesPerChecksum());
- // put the header for on disk bytes
+ // Put the header for the on disk bytes; header currently is unfilled-out
putHeader(onDiskBytesWithHeader, 0,
onDiskBytesWithHeader.length + numBytes,
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
- // set the header for the uncompressed bytes (for cache-on-write)
- putHeader(uncompressedBytesWithHeader, 0,
+ // Set the header for the uncompressed bytes (for cache-on-write) -- IFF different from
+ // onDiskBytesWithHeader array.
+ if (onDiskBytesWithHeader != uncompressedBytesWithHeader) {
+ putHeader(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + numBytes,
uncompressedBytesWithHeader.length, onDiskBytesWithHeader.length);
-
+ }
onDiskChecksum = new byte[numBytes];
ChecksumUtil.generateChecksums(
onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
@@ -1036,9 +1077,9 @@ public class HFileBlock implements Cacheable {
* @param out
* @throws IOException
*/
- public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
+ void writeHeaderAndData(FSDataOutputStream out) throws IOException {
long offset = out.getPos();
- if (startOffset != -1 && offset != startOffset) {
+ if (startOffset != UNSET && offset != startOffset) {
throw new IOException("A " + blockType + " block written to a "
+ "stream twice, first at offset " + startOffset + ", then at "
+ offset);
@@ -1091,7 +1132,7 @@ public class HFileBlock implements Cacheable {
/**
* Releases resources used by this writer.
*/
- public void release() {
+ void release() {
if (dataBlockEncodingCtx != null) {
dataBlockEncodingCtx.close();
dataBlockEncodingCtx = null;
@@ -1112,9 +1153,8 @@ public class HFileBlock implements Cacheable {
*/
int getOnDiskSizeWithoutHeader() {
expectState(State.BLOCK_READY);
- return onDiskBytesWithHeader.length
- + onDiskChecksum.length
- - HConstants.HFILEBLOCK_HEADER_SIZE;
+ return onDiskBytesWithHeader.length +
+ onDiskChecksum.length - HConstants.HFILEBLOCK_HEADER_SIZE;
}
/**
@@ -1146,7 +1186,7 @@ public class HFileBlock implements Cacheable {
}
/** @return true if a block is being written */
- public boolean isWriting() {
+ boolean isWriting() {
return state == State.WRITING;
}
@@ -1157,7 +1197,7 @@ public class HFileBlock implements Cacheable {
*
* @return the number of bytes written
*/
- public int blockSizeWritten() {
+ int blockSizeWritten() {
if (state != State.WRITING) return 0;
return this.unencodedDataSizeWritten;
}
@@ -1205,7 +1245,7 @@ public class HFileBlock implements Cacheable {
* @param out the file system output stream
* @throws IOException
*/
- public void writeBlock(BlockWritable bw, FSDataOutputStream out)
+ void writeBlock(BlockWritable bw, FSDataOutputStream out)
throws IOException {
bw.writeToBlock(startWriting(bw.getBlockType()));
writeHeaderAndData(out);
@@ -1218,7 +1258,7 @@ public class HFileBlock implements Cacheable {
* version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
* 0 value in bytesPerChecksum.
*/
- public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
+ HFileBlock getBlockForCaching(CacheConfig cacheConf) {
HFileContext newContext = new HFileContextBuilder()
.withBlockSize(fileContext.getBlocksize())
.withBytesPerCheckSum(0)
@@ -1241,7 +1281,7 @@ public class HFileBlock implements Cacheable {
}
/** Something that can be written into a block. */
- public interface BlockWritable {
+ interface BlockWritable {
/** The type of block this data should use. */
BlockType getBlockType();
@@ -1258,7 +1298,7 @@ public class HFileBlock implements Cacheable {
// Block readers and writers
/** An interface allowing to iterate {@link HFileBlock}s. */
- public interface BlockIterator {
+ interface BlockIterator {
/**
* Get the next block, or null if there are no more blocks to iterate.
@@ -1273,7 +1313,7 @@ public class HFileBlock implements Cacheable {
}
/** A full-fledged reader with iteration ability. */
- public interface FSReader {
+ interface FSReader {
/**
* Reads the block at the given offset in the file with the given on-disk
@@ -1321,9 +1361,15 @@ public class HFileBlock implements Cacheable {
long offset = -1;
byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
+ @Override
+ public String toString() {
+ return "offset=" + this.offset + ", header=" + Bytes.toStringBinary(header);
+ }
}
- /** Reads version 2 blocks from the filesystem. */
+ /**
+ * Reads version 2 blocks from the filesystem.
+ */
static class FSReaderImpl implements FSReader {
/** The file system stream of the underlying {@link HFile} that
* does or doesn't do checksum validations in the filesystem */
@@ -1362,7 +1408,7 @@ public class HFileBlock implements Cacheable {
// Cache the fileName
protected String pathName;
- public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
+ FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
HFileContext fileContext) throws IOException {
this.fileSize = fileSize;
this.hfs = hfs;
@@ -1420,8 +1466,8 @@ public class HFileBlock implements Cacheable {
* the on-disk size of the next block, or -1 if it could not be determined.
*
* @param dest destination buffer
- * @param destOffset offset in the destination buffer
- * @param size size of the block to be read
+ * @param destOffset offset into the destination buffer at where to put the bytes we read
+ * @param size size of read
* @param peekIntoNextBlock whether to read the next block's on-disk size
* @param fileOffset position in the stream to read at
* @param pread whether we should do a positional read
@@ -1430,12 +1476,10 @@ public class HFileBlock implements Cacheable {
* -1 if it could not be determined
* @throws IOException
*/
- protected int readAtOffset(FSDataInputStream istream,
- byte[] dest, int destOffset, int size,
+ protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
boolean peekIntoNextBlock, long fileOffset, boolean pread)
- throws IOException {
- if (peekIntoNextBlock &&
- destOffset + size + hdrSize > dest.length) {
+ throws IOException {
+ if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
// We are asked to read the next block's header as well, but there is
// not enough room in the array.
throw new IOException("Attempted to read " + size + " bytes and " +
@@ -1522,7 +1566,7 @@ public class HFileBlock implements Cacheable {
HFile.LOG.warn(msg);
throw new IOException(msg); // cannot happen case here
}
- HFile.checksumFailures.increment(); // update metrics
+ HFile.CHECKSUM_FAILURES.increment(); // update metrics
// If we have a checksum failure, we fall back into a mode where
// the next few reads use HDFS level checksums. We aim to make the
@@ -1597,6 +1641,7 @@ public class HFileBlock implements Cacheable {
}
int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
+
// See if we can avoid reading the header. This is desirable, because
// we will not incur a backward seek operation if we have already
// read this block's header as part of the previous read's look-ahead.
@@ -1604,15 +1649,22 @@ public class HFileBlock implements Cacheable {
// been read.
// TODO: How often does this optimization fire? Has to be same thread so the thread local
// is pertinent and we have to be reading next block as in a big scan.
+ ByteBuffer headerBuf = null;
PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
- ByteBuffer headerBuf = prefetchedHeader.offset == offset? prefetchedHeader.buf: null;
-
+ boolean preReadHeader = false;
+ if (prefetchedHeader != null && prefetchedHeader.offset == offset) {
+ headerBuf = prefetchedHeader.buf;
+ preReadHeader = true;
+ }
// Allocate enough space to fit the next block's header too.
int nextBlockOnDiskSize = 0;
byte[] onDiskBlock = null;
HFileBlock b = null;
+ boolean fastPath = false;
+ boolean readHdrOnly = false;
if (onDiskSizeWithHeader > 0) {
+ fastPath = true;
// We know the total on-disk size. Read the entire block into memory,
// then parse the header. This code path is used when
// doing a random read operation relying on the block index, as well as
@@ -1669,6 +1721,7 @@ public class HFileBlock implements Cacheable {
// Unfortunately, we still have to do a separate read operation to
// read the header.
if (headerBuf == null) {
+ readHdrOnly = true;
// From the header, determine the on-disk size of the given hfile
// block, and read the remaining data, thereby incurring two read
// operations. This might happen when we are doing the first read
@@ -1681,12 +1734,12 @@ public class HFileBlock implements Cacheable {
}
// TODO: FIX!!! Expensive parse just to get a length
b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
+ // onDiskBlock is whole block + header + checksums then extra hdrSize to read next header
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
- // headerBuf is HBB
+ // headerBuf is HBB. Copy hdr into onDiskBlock
System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
- nextBlockOnDiskSize =
- readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
- - hdrSize, true, offset + hdrSize, pread);
+ nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize,
+ b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread);
onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
}
@@ -1716,13 +1769,19 @@ public class HFileBlock implements Cacheable {
b.offset = offset;
b.fileContext.setIncludesTags(this.fileContext.isIncludesTags());
b.fileContext.setIncludesMvcc(this.fileContext.isIncludesMvcc());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Read preReadHeader=" + preReadHeader + ", fastPath=" + fastPath +
+ ", readHdrOnly=" + readHdrOnly + ", " + b);
+ }
return b;
}
+ @Override
public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
this.fileContext.setIncludesMvcc(includesMemstoreTS);
}
+ @Override
public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
}
@@ -1772,11 +1831,13 @@ public class HFileBlock implements Cacheable {
@Override
public void serialize(ByteBuffer destination) {
- this.buf.get(destination, 0, getSerializedLength()
- - EXTRA_SERIALIZATION_SPACE);
+ this.buf.get(destination, 0, getSerializedLength() - EXTRA_SERIALIZATION_SPACE);
serializeExtraInfo(destination);
}
+ /**
+ * Write out the content of EXTRA_SERIALIZATION_SPACE. Public so can be accessed by BucketCache.
+ */
public void serializeExtraInfo(ByteBuffer destination) {
destination.put(this.fileContext.isUseHBaseChecksum() ? (byte) 1 : (byte) 0);
destination.putLong(this.offset);
@@ -1862,7 +1923,7 @@ public class HFileBlock implements Cacheable {
}
/**
- * Calcuate the number of bytes required to store all the checksums
+ * Calculate the number of bytes required to store all the checksums
* for this block. Each checksum value is a 4 byte integer.
*/
int totalChecksumBytes() {
@@ -1888,16 +1949,14 @@ public class HFileBlock implements Cacheable {
* Maps a minor version to the size of the header.
*/
public static int headerSize(boolean usesHBaseChecksum) {
- if (usesHBaseChecksum) {
- return HConstants.HFILEBLOCK_HEADER_SIZE;
- }
- return HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
+ return usesHBaseChecksum?
+ HConstants.HFILEBLOCK_HEADER_SIZE: HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
}
/**
* Return the appropriate DUMMY_HEADER for the minor version
*/
- public byte[] getDummyHeaderForVersion() {
+ byte[] getDummyHeaderForVersion() {
return getDummyHeaderForVersion(this.fileContext.isUseHBaseChecksum());
}
@@ -1905,17 +1964,14 @@ public class HFileBlock implements Cacheable {
* Return the appropriate DUMMY_HEADER for the minor version
*/
static private byte[] getDummyHeaderForVersion(boolean usesHBaseChecksum) {
- if (usesHBaseChecksum) {
- return HConstants.HFILEBLOCK_DUMMY_HEADER;
- }
- return DUMMY_HEADER_NO_CHECKSUM;
+ return usesHBaseChecksum? HConstants.HFILEBLOCK_DUMMY_HEADER: DUMMY_HEADER_NO_CHECKSUM;
}
/**
* @return the HFileContext used to create this HFileBlock. Not necessary the
* fileContext for the file from which this block's data was originally read.
*/
- public HFileContext getHFileContext() {
+ HFileContext getHFileContext() {
return this.fileContext;
}
@@ -1927,7 +1983,7 @@ public class HFileBlock implements Cacheable {
/**
* @return true if this block is backed by a shared memory area(such as that of a BucketCache).
*/
- public boolean usesSharedMemory() {
+ boolean usesSharedMemory() {
return this.memType == MemoryType.SHARED;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 239c63d001e..331b8ba918c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1491,13 +1491,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("From Cache " + cachedBlock);
+ }
if (Trace.isTracing()) {
traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
}
assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) {
if (updateCacheMetrics) {
- HFile.dataBlockReadCnt.increment();
+ HFile.DATABLOCK_READ_COUNT.increment();
}
// Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit.
@@ -1537,7 +1540,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
- HFile.dataBlockReadCnt.increment();
+ HFile.DATABLOCK_READ_COUNT.increment();
}
return unpacked;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 1d04467e79b..c67bdd4976b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -38,7 +39,7 @@ import org.apache.hadoop.hbase.Cell;
* getValue.
*/
@InterfaceAudience.Private
-public interface HFileScanner extends Shipper {
+public interface HFileScanner extends Shipper, Closeable {
/**
* SeekTo or just before the passed BucketCache can be used as mainly a block cache (see
- * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
+ * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
* LruBlockCache to decrease CMS GC and heap fragmentation.
*
* It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
@@ -349,6 +349,7 @@ public class BucketCache implements BlockCache, HeapSize {
*/
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean wait) {
+ if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
if (!cacheEnabled) {
return;
}
@@ -422,6 +423,9 @@ public class BucketCache implements BlockCache, HeapSize {
// TODO : change this area - should be removed after server cells and
// 12295 are available
int len = bucketEntry.getLength();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
+ }
Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
bucketEntry.deserializerReference(this.deserialiserMap));
long timeTaken = System.nanoTime() - start;
@@ -628,7 +632,9 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private void freeSpace(final String why) {
// Ensure only one freeSpace progress at a time
- if (!freeSpaceLock.tryLock()) return;
+ if (!freeSpaceLock.tryLock()) {
+ return;
+ }
try {
freeInProgress = true;
long bytesToFreeWithoutExtra = 0;
@@ -657,7 +663,7 @@ public class BucketCache implements BlockCache, HeapSize {
return;
}
long currentSize = bucketAllocator.getUsedSize();
- long totalSize=bucketAllocator.getTotalSize();
+ long totalSize = bucketAllocator.getTotalSize();
if (LOG.isDebugEnabled() && msgBuffer != null) {
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
" of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
@@ -864,7 +870,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
- // Make sure data pages are written are on media before we update maps.
+ // Make sure data pages are written on media before we update maps.
try {
ioEngine.sync();
} catch (IOException ioex) {
@@ -938,9 +944,9 @@ public class BucketCache implements BlockCache, HeapSize {
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
- if (!ioEngine.isPersistent())
- throw new IOException(
- "Attempt to persist non-persistent cache mappings!");
+ if (!ioEngine.isPersistent()) {
+ throw new IOException("Attempt to persist non-persistent cache mappings!");
+ }
fos = new FileOutputStream(persistencePath, false);
oos = new ObjectOutputStream(fos);
oos.writeLong(cacheCapacity);
@@ -1020,19 +1026,17 @@ public class BucketCache implements BlockCache, HeapSize {
}
/**
- * Used to shut down the cache -or- turn it off in the case of something
- * broken.
+ * Used to shut down the cache -or- turn it off in the case of something broken.
*/
private void disableCache() {
- if (!cacheEnabled)
- return;
+ if (!cacheEnabled) return;
cacheEnabled = false;
ioEngine.shutdown();
this.scheduleThreadPool.shutdown();
- for (int i = 0; i < writerThreads.length; ++i)
- writerThreads[i].interrupt();
+ for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
this.ramCache.clear();
if (!ioEngine.isPersistent() || persistencePath == null) {
+ // If persistent ioengine and a path, we will serialize out the backingMap.
this.backingMap.clear();
}
}
@@ -1327,6 +1331,9 @@ public class BucketCache implements BlockCache, HeapSize {
len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
block.serializeExtraInfo(extraInfoBuffer);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Write offset=" + offset + ", len=" + len);
+ }
ioEngine.write(sliceBuf, offset);
ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
} else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
index 1ef918c9e05..66fee6afd03 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
@@ -32,6 +32,7 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeSeeker;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -80,6 +80,7 @@ public class TestDataBlockEncoders {
private static int ENCODED_DATA_OFFSET = HConstants.HFILEBLOCK_HEADER_SIZE
+ DataBlockEncoding.ID_SIZE;
+ static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
private RedundantKVGenerator generator = new RedundantKVGenerator();
private Random randomizer = new Random(42l);
@@ -109,11 +110,9 @@ public class TestDataBlockEncoders {
.withIncludesTags(includesTags)
.withCompression(algo).build();
if (encoder != null) {
- return encoder.newDataBlockEncodingContext(encoding,
- HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
+ return encoder.newDataBlockEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta);
} else {
- return new HFileBlockDefaultEncodingContext(encoding,
- HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
+ return new HFileBlockDefaultEncodingContext(encoding, HFILEBLOCK_DUMMY_HEADER, meta);
}
}
@@ -249,7 +248,7 @@ public class TestDataBlockEncoders {
HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
DataBlockEncoder encoder = encoding.getEncoder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
+ baos.write(HFILEBLOCK_DUMMY_HEADER);
DataOutputStream dos = new DataOutputStream(baos);
encoder.startBlockEncoding(encodingContext, dos);
for (KeyValue kv : kvs) {
@@ -386,10 +385,10 @@ public class TestDataBlockEncoders {
continue;
}
HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding,
- HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
+ HFILEBLOCK_DUMMY_HEADER, fileContext);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
+ baos.write(HFILEBLOCK_DUMMY_HEADER);
DataOutputStream dos = new DataOutputStream(baos);
encoder.startBlockEncoding(encodingContext, dos);
for (KeyValue kv : kvList) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
index 783f58e8b10..21941f77237 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
@@ -46,7 +46,7 @@ import org.junit.runners.Parameterized.Parameters;
@Category({IOTests.class, SmallTests.class})
@RunWith(Parameterized.class)
public class TestSeekToBlockWithEncoders {
-
+ static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
private final boolean useOffheapData;
@Parameters
@@ -281,7 +281,7 @@ public class TestSeekToBlockWithEncoders {
.withIncludesMvcc(false).withIncludesTags(false)
.withCompression(Compression.Algorithm.NONE).build();
HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding,
- HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
+ HFILEBLOCK_DUMMY_HEADER, meta);
ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
encodingContext, this.useOffheapData);
DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index e8a2882ab9b..91ab8c07d7b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -42,12 +42,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.junit.Before;
import org.junit.Test;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
index 16353107794..68dc6259562 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
@@ -97,7 +97,7 @@ public class TestForceCacheImportantBlocks {
public void setup() {
// Make sure we make a new one each time.
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
- HFile.dataBlockReadCnt.set(0);
+ HFile.DATABLOCK_READ_COUNT.set(0);
}
@Test
@@ -114,12 +114,12 @@ public class TestForceCacheImportantBlocks {
CacheStats stats = cache.getStats();
writeTestData(region);
assertEquals(0, stats.getHitCount());
- assertEquals(0, HFile.dataBlockReadCnt.get());
+ assertEquals(0, HFile.DATABLOCK_READ_COUNT.get());
// Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss
// count should go up. Otherwise, all should be cached and the miss count should not rise.
region.get(new Get(Bytes.toBytes("row" + 0)));
assertTrue(stats.getHitCount() > 0);
- assertTrue(HFile.dataBlockReadCnt.get() > 0);
+ assertTrue(HFile.DATABLOCK_READ_COUNT.get() > 0);
long missCount = stats.getMissCount();
region.get(new Get(Bytes.toBytes("row" + 0)));
if (this.cfCacheEnabled) assertEquals(missCount, stats.getMissCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java
new file mode 100644
index 00000000000..5c2e7d607ee
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBackedByBucketCache.java
@@ -0,0 +1,231 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+/**
+ * Test for file-backed BucketCache.
+ */
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileBackedByBucketCache {
+ private static final Log LOG = LogFactory.getLog(TestHFileBackedByBucketCache.class);
+ @Rule public TestName name = new TestName();
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
+ withLookingForStuckThread(true).build();
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final int ROW_LENGTH = 4;
+ private Configuration conf;
+ private FileSystem fs;
+
+ // MATH! SIZING FOR THE TEST!
+ // Set bucketcache to be smallest size possible which is 1MB. We do that in the test
+ // @Before necessaryLen
and if possible,
+ * extraLen
also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
- * number of "extra" bytes that would be desirable but not absolutely
- * necessary to read.
+ * number of "extra" bytes to also optionally read.
*
* @param in the input stream to read from
* @param buf the buffer to read into
* @param bufOffset the destination offset in the buffer
- * @param necessaryLen the number of bytes that are absolutely necessary to
- * read
+ * @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @return true if succeeded reading the extra bytes
* @throws IOException if failed to read the necessary bytes
*/
- public static boolean readWithExtra(InputStream in, byte[] buf,
+ static boolean readWithExtra(InputStream in, byte[] buf,
int bufOffset, int necessaryLen, int extraLen) throws IOException {
int bytesRemaining = necessaryLen + extraLen;
while (bytesRemaining > 0) {
@@ -723,7 +759,8 @@ public class HFileBlock implements Cacheable {
}
/**
- * Read from an input stream. Analogous to
+ * Read from an input stream at least necessaryLen
and if possible,
+ * extraLen
also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses
* positional read and specifies a number of "extra" bytes that would be
* desirable but not absolutely necessary to read.
@@ -776,14 +813,13 @@ public class HFileBlock implements Cacheable {
* cell
. Examine the return
* code to figure whether we found the cell or not.
@@ -154,4 +155,4 @@ public interface HFileScanner extends Shipper {
* Close this HFile scanner and do necessary cleanup.
*/
void close();
-}
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 186d86bb790..d310d136f3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.io.Writable;
public class HFileWriterImpl implements HFile.Writer {
private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class);
+ private static final long UNSET = -1;
+
/** The Cell previously appended. Becomes the last cell in the file.*/
protected Cell lastCell = null;
@@ -129,16 +131,16 @@ public class HFileWriterImpl implements HFile.Writer {
private Listbefore
method. Into out 1MB cache, have it so only one bucket. If
+ // bucketsize is set to 125kb in size, we will have one bucket in our 1MB bucketcache. It is
+ // cryptic how this comes about but basically comes down to
+ // {@link BucketAllocator#FEWEST_ITEMS_IN_BUCKET} being '4'... so 4 * 125 = just over 500k or so
+ // which makes for one bucket only in 1M which you can see from TRACE logging:
+ //
+ // Cache totalSize=532480, buckets=1, bucket capacity=
+ // 532480=(4*133120)=(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))
+ //
+ // Now into this one big bucket, we write hfileblocks....Each hfileblock has two keys because
+ // first is under the BLOCKSIZE of 64k and then the second puts us over the 64k...
+ // so two Cells per block...
+ /**
+ * Default size.
+ */
+ private static final int BLOCKSIZE = 64 * 1024;
+
+ /**
+ * Bucket sizes get multiplied by 4 for actual bucket size.
+ * See {@link BucketAllocator#FEWEST_ITEMS_IN_BUCKET}.
+ */
+ private static final int BUCKETSIZE = 125 * 1024;
+
+ /**
+ * Make it so one Cell is just under a BLOCKSIZE. The second Cell puts us over the BLOCKSIZE
+ * so we have two Cells per HFilBlock.
+ */
+ private static final int VALUE_SIZE = 33 * 1024;
+
+ @Before
+ public void before() throws IOException {
+ // Do setup of a bucketcache that has one bucket only. Enable trace-level logging for
+ // key classes.
+ this.conf = TEST_UTIL.getConfiguration();
+ this.fs = FileSystem.get(conf);
+
+ // Set BucketCache and HFileBlock to log at trace level.
+ setTraceLevel(BucketCache.class);
+ setTraceLevel(HFileBlock.class);
+ setTraceLevel(HFileReaderImpl.class);
+ setTraceLevel(BucketAllocator.class);
+ }
+
+ // Assumes log4j logging.
+ private static void setTraceLevel(final Class> clazz) {
+ Log testlog = LogFactory.getLog(clazz.getName());
+ ((org.apache.commons.logging.impl.Log4JLogger)testlog).getLogger().
+ setLevel(org.apache.log4j.Level.TRACE);
+ }
+
+ /**
+ * Test that bucketcache is caching and that the persist of in-memory map works
+ * @throws IOException
+ */
+ @Test
+ public void testBucketCacheCachesAndPersists() throws IOException {
+ // Set up a bucket cache. Set up one that will persist by passing a
+ // hbase.bucketcache.persistent.path value to store the in-memory map of what is out in
+ // the file-backed bucketcache. Set bucketcache to have one size only, BUCKETSIZE.
+ // See "MATH! SIZING FOR THE TEST!" note above around declaration of BUCKETSIZE
+ String bucketCacheDataFile =
+ (new Path(TEST_UTIL.getDataTestDir(), "bucketcache.data")).toString();
+ (new File(bucketCacheDataFile)).getParentFile().mkdirs();
+ this.conf.set("hbase.bucketcache.ioengine", "file:" + bucketCacheDataFile);
+ this.conf.set("hbase.bucketcache.persistent.path", bucketCacheDataFile + ".map");
+ this.conf.setStrings("hbase.bucketcache.bucket.sizes", Integer.toString(BUCKETSIZE));
+ // This is minimum bucketcache size.... 1MB.
+ this.conf.setInt("hbase.bucketcache.size", 1);
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ Path hfilePath = new Path(TEST_UTIL.getDataTestDir(), this.name.getMethodName());
+ // Write 8 entries which should make for four hfileBlocks.
+ final int count = 8;
+ final int hfileBlockCount = 4;
+ Listcount
entries.
+ * @return The Cells written to the file.
+ * @throws IOException
+ */
+ private List