HBASE-11331 [blockcache] lazy block decompression

When hbase.block.data.cachecompressed=true, DATA (and ENCODED_DATA) blocks are
cached in the BlockCache in their on-disk format. This is different from the
default behavior, which decompresses and decrypts a block before caching.
This commit is contained in:
Nick Dimiduk 2014-09-09 15:50:53 -07:00
parent 7066de6362
commit eec15bd172
19 changed files with 834 additions and 330 deletions

View File

@ -96,6 +96,23 @@ public class HFileContext implements HeapSize, Cloneable {
this.cryptoContext = cryptoContext; this.cryptoContext = cryptoContext;
} }
/**
* @return true when on-disk blocks from this file are compressed, and/or encrypted;
* false otherwise.
*/
public boolean isCompressedOrEncrypted() {
Compression.Algorithm compressAlgo = getCompression();
boolean compressed =
compressAlgo != null
&& compressAlgo != Compression.Algorithm.NONE;
Encryption.Context cryptoContext = getEncryptionContext();
boolean encrypted = cryptoContext != null
&& cryptoContext != Encryption.Context.NONE;
return compressed || encrypted;
}
public Compression.Algorithm getCompression() { public Compression.Algorithm getCompression() {
return compressAlgo; return compressAlgo;
} }

View File

@ -157,9 +157,9 @@ org.apache.hadoop.util.StringUtils;
reader is closed</td> reader is closed</td>
</tr> </tr>
<tr> <tr>
<td>Compress blocks</td> <td>Cache DATA in compressed format</td>
<td><% cacheConfig.shouldCacheCompressed() %></td> <td><% cacheConfig.shouldCacheDataCompressed() %></td>
<td>True if blocks are compressed in cache</td> <td>True if DATA blocks are cached in their compressed form</td>
</tr> </tr>
<tr> <tr>
<td>Prefetch on Open</td> <td>Prefetch on Open</td>

View File

@ -64,11 +64,10 @@ public class CacheConfig {
"hfile.block.bloom.cacheonwrite"; "hfile.block.bloom.cacheonwrite";
/** /**
* TODO: Implement this (jgray) * Configuration key to cache data blocks in compressed and/or encrypted format.
* Configuration key to cache data blocks in compressed format.
*/ */
public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY = public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY =
"hbase.rs.blockcache.cachedatacompressed"; "hbase.block.data.cachecompressed";
/** /**
* Configuration key to evict all blocks of a given file from the block cache * Configuration key to evict all blocks of a given file from the block cache
@ -119,6 +118,14 @@ public class CacheConfig {
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = public static final String PREFETCH_BLOCKS_ON_OPEN_KEY =
"hbase.rs.prefetchblocksonopen"; "hbase.rs.prefetchblocksonopen";
/**
* The target block size used by blockcache instances. Defaults to
* {@link HConstants#DEFAULT_BLOCKSIZE}.
* TODO: this config point is completely wrong, as it's used to determine the
* target block size of BlockCache instances. Rename.
*/
public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize";
// Defaults // Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@ -127,7 +134,7 @@ public class CacheConfig {
public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false; public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false;
public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_COMPRESSED_CACHE = false; public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
/** Local reference to the block cache, null if completely disabled */ /** Local reference to the block cache, null if completely disabled */
@ -156,8 +163,8 @@ public class CacheConfig {
/** Whether blocks of a file should be evicted when the file is closed */ /** Whether blocks of a file should be evicted when the file is closed */
private boolean evictOnClose; private boolean evictOnClose;
/** Whether data blocks should be stored in compressed form in the cache */ /** Whether data blocks should be stored in compressed and/or encrypted form in the cache */
private final boolean cacheCompressed; private final boolean cacheDataCompressed;
/** Whether data blocks should be prefetched into the cache */ /** Whether data blocks should be prefetched into the cache */
private final boolean prefetchOnOpen; private final boolean prefetchOnOpen;
@ -189,7 +196,7 @@ public class CacheConfig {
DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY,
DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(),
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(), DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(),
conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
@ -208,13 +215,10 @@ public class CacheConfig {
DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set
// strictly from conf // strictly from conf
conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE),
conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE),
DEFAULT_CACHE_INDEXES_ON_WRITE), conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE),
conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_BLOOMS_ON_WRITE),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE),
conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
DEFAULT_COMPRESSED_CACHE),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN),
conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1)
@ -232,7 +236,7 @@ public class CacheConfig {
* @param cacheIndexesOnWrite whether index blocks should be cached on write * @param cacheIndexesOnWrite whether index blocks should be cached on write
* @param cacheBloomsOnWrite whether blooms should be cached on write * @param cacheBloomsOnWrite whether blooms should be cached on write
* @param evictOnClose whether blocks should be evicted when HFile is closed * @param evictOnClose whether blocks should be evicted when HFile is closed
* @param cacheCompressed whether to store blocks as compressed in the cache * @param cacheDataCompressed whether to store blocks as compressed in the cache
* @param prefetchOnOpen whether to prefetch blocks upon open * @param prefetchOnOpen whether to prefetch blocks upon open
* @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families
* data blocks up in the L1 tier. * data blocks up in the L1 tier.
@ -241,7 +245,7 @@ public class CacheConfig {
final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnRead, final boolean inMemory,
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose, final boolean cacheBloomsOnWrite, final boolean evictOnClose,
final boolean cacheCompressed, final boolean prefetchOnOpen, final boolean cacheDataCompressed, final boolean prefetchOnOpen,
final boolean cacheDataInL1) { final boolean cacheDataInL1) {
this.blockCache = blockCache; this.blockCache = blockCache;
this.cacheDataOnRead = cacheDataOnRead; this.cacheDataOnRead = cacheDataOnRead;
@ -250,7 +254,7 @@ public class CacheConfig {
this.cacheIndexesOnWrite = cacheIndexesOnWrite; this.cacheIndexesOnWrite = cacheIndexesOnWrite;
this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.cacheBloomsOnWrite = cacheBloomsOnWrite;
this.evictOnClose = evictOnClose; this.evictOnClose = evictOnClose;
this.cacheCompressed = cacheCompressed; this.cacheDataCompressed = cacheDataCompressed;
this.prefetchOnOpen = prefetchOnOpen; this.prefetchOnOpen = prefetchOnOpen;
this.cacheDataInL1 = cacheDataInL1; this.cacheDataInL1 = cacheDataInL1;
LOG.info(this); LOG.info(this);
@ -264,7 +268,7 @@ public class CacheConfig {
this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
cacheConf.cacheCompressed, cacheConf.prefetchOnOpen, cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
cacheConf.cacheDataInL1); cacheConf.cacheDataInL1);
} }
@ -298,14 +302,13 @@ public class CacheConfig {
* available. * available.
*/ */
public boolean shouldCacheBlockOnRead(BlockCategory category) { public boolean shouldCacheBlockOnRead(BlockCategory category) {
boolean shouldCache = isBlockCacheEnabled() return isBlockCacheEnabled()
&& (cacheDataOnRead || && (cacheDataOnRead ||
category == BlockCategory.INDEX || category == BlockCategory.INDEX ||
category == BlockCategory.BLOOM || category == BlockCategory.BLOOM ||
(prefetchOnOpen && (prefetchOnOpen &&
(category != BlockCategory.META && (category != BlockCategory.META &&
category != BlockCategory.UNKNOWN))); category != BlockCategory.UNKNOWN)));
return shouldCache;
} }
/** /**
@ -384,10 +387,23 @@ public class CacheConfig {
} }
/** /**
* @return true if blocks should be compressed in the cache, false if not * @return true if data blocks should be compressed in the cache, false if not
*/ */
public boolean shouldCacheCompressed() { public boolean shouldCacheDataCompressed() {
return isBlockCacheEnabled() && this.cacheCompressed; return isBlockCacheEnabled() && this.cacheDataCompressed;
}
/**
* @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise
*/
public boolean shouldCacheCompressed(BlockCategory category) {
if (!isBlockCacheEnabled()) return false;
switch (category) {
case DATA:
return this.cacheDataCompressed;
default:
return false;
}
} }
/** /**
@ -408,7 +424,7 @@ public class CacheConfig {
", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + ", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() +
", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + ", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() +
", cacheEvictOnClose=" + shouldEvictOnClose() + ", cacheEvictOnClose=" + shouldEvictOnClose() +
", cacheCompressed=" + shouldCacheCompressed() + ", cacheDataCompressed=" + shouldCacheDataCompressed() +
", prefetchOnOpen=" + shouldPrefetchOnOpen(); ", prefetchOnOpen=" + shouldPrefetchOnOpen();
} }
@ -449,7 +465,7 @@ public class CacheConfig {
*/ */
private static LruBlockCache getL1(final Configuration c, final MemoryUsage mu) { private static LruBlockCache getL1(final Configuration c, final MemoryUsage mu) {
long lruCacheSize = getLruCacheSize(c, mu); long lruCacheSize = getLruCacheSize(c, mu);
int blockSize = c.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
LOG.info("Allocating LruBlockCache size=" + LOG.info("Allocating LruBlockCache size=" +
StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize));
return new LruBlockCache(lruCacheSize, blockSize, true, c); return new LruBlockCache(lruCacheSize, blockSize, true, c);
@ -466,7 +482,7 @@ public class CacheConfig {
String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null;
int blockSize = c.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
long bucketCacheSize = (long) (bucketCachePercentage < 1? mu.getMax() * bucketCachePercentage: long bucketCacheSize = (long) (bucketCachePercentage < 1? mu.getMax() * bucketCachePercentage:
bucketCachePercentage * 1024 * 1024); bucketCachePercentage * 1024 * 1024);

View File

@ -36,9 +36,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
@ -64,25 +61,26 @@ import com.google.common.base.Preconditions;
* information from the block index are required to read a block. * information from the block index are required to read a block.
* <li>In version 2 a block is structured as follows: * <li>In version 2 a block is structured as follows:
* <ul> * <ul>
* <li>header (see {@link Writer#finishBlock()})
* <ul>
* <li>Magic record identifying the block type (8 bytes) * <li>Magic record identifying the block type (8 bytes)
* <li>Compressed block size, header not included (4 bytes) * <li>Compressed block size, excluding header, including checksum (4 bytes)
* <li>Uncompressed block size, header not included (4 bytes) * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
* <li>The offset of the previous block of the same type (8 bytes). This is * <li>The offset of the previous block of the same type (8 bytes). This is
* used to be able to navigate to the previous block without going to the block * used to be able to navigate to the previous block without going to the block
* <li>For minorVersions >=1, there is an additional 4 byte field * <li>For minorVersions >=1, the ordinal describing checksum type (1 byte)
* bytesPerChecksum that records the number of bytes in a checksum chunk. * <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
* <li>For minorVersions >=1, there is a 4 byte value to store the size of * <li>For minorVersions >=1, the size of data on disk, including header,
* data on disk (excluding the checksums) * excluding checksums (4 bytes)
* </ul>
* </li>
* <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
* same for all the blocks in the {@link HFile}, similarly to what was done in
* version 1.
* <li>For minorVersions >=1, a series of 4 byte checksums, one each for * <li>For minorVersions >=1, a series of 4 byte checksums, one each for
* the number of bytes specified by bytesPerChecksum. * the number of bytes specified by bytesPerChecksum.
* index.
* <li>Compressed data (or uncompressed data if compression is disabled). The
* compression algorithm is the same for all the blocks in the {@link HFile},
* similarly to what was done in version 1.
* </ul> * </ul>
* </ul> * </ul>
* The version 2 block representation in the block cache is the same as above,
* except that the data section is always uncompressed in the cache.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class HFileBlock implements Cacheable { public class HFileBlock implements Cacheable {
@ -111,7 +109,7 @@ public class HFileBlock implements Cacheable {
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
// meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
+ Bytes.SIZEOF_LONG; + Bytes.SIZEOF_LONG;
/** /**
@ -136,6 +134,9 @@ public class HFileBlock implements Cacheable {
HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
ourBuffer.offset = buf.getLong(); ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
if (ourBuffer.hasNextBlockHeader()) {
ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize());
}
return ourBuffer; return ourBuffer;
} }
@ -155,23 +156,28 @@ public class HFileBlock implements Cacheable {
.registerDeserializer(blockDeserializer); .registerDeserializer(blockDeserializer);
} }
/** Type of block. Header field 0. */
private BlockType blockType; private BlockType blockType;
/** Size on disk without the header. It includes checksum data too. */ /** Size on disk excluding header, including checksum. Header field 1. */
private int onDiskSizeWithoutHeader; private int onDiskSizeWithoutHeader;
/** Size of pure data. Does not include header or checksums */ /** Size of pure data. Does not include header or checksums. Header field 2. */
private final int uncompressedSizeWithoutHeader; private final int uncompressedSizeWithoutHeader;
/** The offset of the previous block on disk */ /** The offset of the previous block on disk. Header field 3. */
private final long prevBlockOffset; private final long prevBlockOffset;
/** Size on disk of header and data. Does not include checksum data */ /**
* Size on disk of header + data. Excludes checksum. Header field 6,
* OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum.
*/
private final int onDiskDataSizeWithHeader; private final int onDiskDataSizeWithHeader;
/** The in-memory representation of the hfile block */ /** The in-memory representation of the hfile block */
private ByteBuffer buf; private ByteBuffer buf;
/** Meta data that holds meta information on the hfileblock**/
/** Meta data that holds meta information on the hfileblock */
private HFileContext fileContext; private HFileContext fileContext;
/** /**
@ -193,27 +199,18 @@ public class HFileBlock implements Cacheable {
* and is sitting in a byte buffer. * and is sitting in a byte buffer.
* *
* @param blockType the type of this block, see {@link BlockType} * @param blockType the type of this block, see {@link BlockType}
* @param onDiskSizeWithoutHeader compressed size of the block if compression * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
* is used, otherwise uncompressed size, header size not included * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
* @param uncompressedSizeWithoutHeader uncompressed size of the block, * @param prevBlockOffset see {@link #prevBlockOffset}
* header size not included. Equals onDiskSizeWithoutHeader if
* compression is disabled.
* @param prevBlockOffset the offset of the previous block in the
* {@link HFile}
* @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by
* uncompressed data. This * uncompressed data. This
* @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of * @param fillHeader when true, parse {@code buf} and override the first 4 header fields.
* the buffer based on the header fields provided
* @param offset the file offset the block was read from * @param offset the file offset the block was read from
* @param bytesPerChecksum the number of bytes per checksum chunk * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
* @param checksumType the checksum algorithm to use
* @param onDiskDataSizeWithHeader size of header and data on disk not
* including checksum data
* @param fileContext HFile meta data * @param fileContext HFile meta data
*/ */
HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf, long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
boolean fillHeader, long offset,
int onDiskDataSizeWithHeader, HFileContext fileContext) { int onDiskDataSizeWithHeader, HFileContext fileContext) {
this.blockType = blockType; this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
@ -225,6 +222,22 @@ public class HFileBlock implements Cacheable {
this.offset = offset; this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.fileContext = fileContext; this.fileContext = fileContext;
this.buf.rewind();
}
/**
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
*/
HFileBlock(HFileBlock that) {
this.blockType = that.blockType;
this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
this.prevBlockOffset = that.prevBlockOffset;
this.buf = that.buf.duplicate();
this.offset = that.offset;
this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
this.fileContext = that.fileContext;
this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
} }
/** /**
@ -272,28 +285,21 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* @return the on-disk size of the block with header size included. This * @return the on-disk size of header + data part + checksum.
* includes the header, the data and the checksum data.
*/ */
public int getOnDiskSizeWithHeader() { public int getOnDiskSizeWithHeader() {
return onDiskSizeWithoutHeader + headerSize(); return onDiskSizeWithoutHeader + headerSize();
} }
/** /**
* Returns the size of the compressed part of the block in case compression * @return the on-disk size of the data part + checksum (header excluded).
* is used, or the uncompressed size of the data part otherwise. Header size
* and checksum data size is not included.
*
* @return the on-disk size of the data part of the block, header and
* checksum not included.
*/ */
public int getOnDiskSizeWithoutHeader() { public int getOnDiskSizeWithoutHeader() {
return onDiskSizeWithoutHeader; return onDiskSizeWithoutHeader;
} }
/** /**
* @return the uncompressed size of the data part of the block, header not * @return the uncompressed size of data part (header and checksum excluded).
* included
*/ */
public int getUncompressedSizeWithoutHeader() { public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader; return uncompressedSizeWithoutHeader;
@ -308,8 +314,8 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
* buffer. Resets the buffer position to the end of header as side effect. * is modified as side-effect.
*/ */
private void overwriteHeader() { private void overwriteHeader() {
buf.rewind(); buf.rewind();
@ -320,11 +326,9 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* Returns a buffer that does not include the header. The array offset points * Returns a buffer that does not include the header or checksum.
* to the start of the block data right after the header. The underlying data
* array is not copied. Checksum data is not included in the returned buffer.
* *
* @return the buffer with header skipped * @return the buffer with header skipped and checksum omitted.
*/ */
public ByteBuffer getBufferWithoutHeader() { public ByteBuffer getBufferWithoutHeader() {
return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
@ -336,7 +340,7 @@ public class HFileBlock implements Cacheable {
* modify the buffer object. This method has to be public because it is * modify the buffer object. This method has to be public because it is
* used in {@link CompoundBloomFilter} to avoid object creation on every * used in {@link CompoundBloomFilter} to avoid object creation on every
* Bloom filter lookup, but has to be used with caution. Checksum data * Bloom filter lookup, but has to be used with caution. Checksum data
* is not included in the returned buffer. * is not included in the returned buffer but header data is.
* *
* @return the buffer of this block for read-only operations * @return the buffer of this block for read-only operations
*/ */
@ -350,17 +354,17 @@ public class HFileBlock implements Cacheable {
* not modify the buffer object. This method has to be public because it is * not modify the buffer object. This method has to be public because it is
* used in {@link BucketCache} to avoid buffer copy. * used in {@link BucketCache} to avoid buffer copy.
* *
* @return the byte buffer with header included for read-only operations * @return the buffer with header and checksum included for read-only operations
*/ */
public ByteBuffer getBufferReadOnlyWithHeader() { public ByteBuffer getBufferReadOnlyWithHeader() {
return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
} }
/** /**
* Returns a byte buffer of this block, including header data, positioned at * Returns a byte buffer of this block, including header data and checksum, positioned at
* the beginning of header. The underlying data array is not copied. * the beginning of header. The underlying data array is not copied.
* *
* @return the byte buffer with header included * @return the byte buffer with header and checksum included
*/ */
ByteBuffer getBufferWithHeader() { ByteBuffer getBufferWithHeader() {
ByteBuffer dupBuf = buf.duplicate(); ByteBuffer dupBuf = buf.duplicate();
@ -376,22 +380,25 @@ public class HFileBlock implements Cacheable {
} }
} }
private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField)
throws IOException {
if (valueFromBuf != valueFromField) {
throw new IOException("Block type stored in the buffer: " +
valueFromBuf + ", block type field: " + valueFromField);
}
}
/** /**
* Checks if the block is internally consistent, i.e. the first * Checks if the block is internally consistent, i.e. the first
* {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
* with the fields. This function is primary for testing and debugging, and * valid header consistent with the fields. Assumes a packed block structure.
* is not thread-safe, because it alters the internal buffer pointer. * This function is primary for testing and debugging, and is not
* thread-safe, because it alters the internal buffer pointer.
*/ */
void sanityCheck() throws IOException { void sanityCheck() throws IOException {
buf.rewind(); buf.rewind();
{ sanityCheckAssertion(BlockType.read(buf), blockType);
BlockType blockTypeFromBuf = BlockType.read(buf);
if (blockTypeFromBuf != blockType) {
throw new IOException("Block type stored in the buffer: " +
blockTypeFromBuf + ", block type field: " + blockType);
}
}
sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
"onDiskSizeWithoutHeader"); "onDiskSizeWithoutHeader");
@ -403,45 +410,65 @@ public class HFileBlock implements Cacheable {
if (this.fileContext.isUseHBaseChecksum()) { if (this.fileContext.isUseHBaseChecksum()) {
sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
"onDiskDataSizeWithHeader");
} }
int cksumBytes = totalChecksumBytes(); int cksumBytes = totalChecksumBytes();
int hdrSize = headerSize(); int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() +
cksumBytes;
if (buf.limit() != expectedBufLimit) { if (buf.limit() != expectedBufLimit) {
throw new AssertionError("Expected buffer limit " + expectedBufLimit throw new AssertionError("Expected buffer limit " + expectedBufLimit
+ ", got " + buf.limit()); + ", got " + buf.limit());
} }
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
// block's, header, so there are two sensible values for buffer capacity. // block's header, so there are two sensible values for buffer capacity.
int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; int hdrSize = headerSize();
if (buf.capacity() != size && if (buf.capacity() != expectedBufLimit &&
buf.capacity() != size + hdrSize) { buf.capacity() != expectedBufLimit + hdrSize) {
throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
", expected " + size + " or " + (size + hdrSize)); ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
} }
} }
@Override @Override
public String toString() { public String toString() {
return "blockType=" StringBuilder sb = new StringBuilder()
+ blockType .append("HFileBlock [")
+ ", onDiskSizeWithoutHeader=" .append(" fileOffset=").append(offset)
+ onDiskSizeWithoutHeader .append(" headerSize()=").append(headerSize())
+ ", uncompressedSizeWithoutHeader=" .append(" blockType=").append(blockType)
+ uncompressedSizeWithoutHeader .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
+ ", prevBlockOffset=" .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
+ prevBlockOffset .append(" prevBlockOffset=").append(prevBlockOffset)
+ ", dataBeginsWith=" .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
+ Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), if (fileContext.isUseHBaseChecksum()) {
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())) sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
+ ", fileOffset=" + offset; .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
} else {
sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
.append("(").append(onDiskSizeWithoutHeader)
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
}
sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
.append(" totalChecksumBytes()=").append(totalChecksumBytes())
.append(" isUnpacked()=").append(isUnpacked())
.append(" buf=[ ")
.append(buf)
.append(", array().length=").append(buf.array().length)
.append(", arrayOffset()=").append(buf.arrayOffset())
.append(" ]")
.append(" dataBeginsWith=")
.append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())))
.append(" fileContext=").append(fileContext)
.append(" ]");
return sb.toString();
} }
/**
* Called after reading a block with provided onDiskSizeWithHeader.
*/
private void validateOnDiskSizeWithoutHeader( private void validateOnDiskSizeWithoutHeader(
int expectedOnDiskSizeWithoutHeader) throws IOException { int expectedOnDiskSizeWithoutHeader) throws IOException {
if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) { if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
@ -456,33 +483,81 @@ public class HFileBlock implements Cacheable {
} }
} }
/**
* Retrieves the decompressed/decrypted view of this block. An encoded block remains in its
* encoded structure. Internal structures are shared between instances where applicable.
*/
HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException {
if (!fileContext.isCompressedOrEncrypted()) {
// TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean),
// which is used for block serialization to L2 cache, does not preserve encoding and
// encryption details.
return this;
}
HFileBlock unpacked = new HFileBlock(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(),
this.getBufferReadOnlyWithHeader().array(), this.headerSize());
// Preserve the next block's header bytes in the new block if we have them.
if (unpacked.hasNextBlockHeader()) {
System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader,
unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() +
unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(),
unpacked.headerSize());
}
return unpacked;
}
/**
* Return true when this buffer includes next block's header.
*/
private boolean hasNextBlockHeader() {
return nextBlockOnDiskSizeWithHeader > 0;
}
/** /**
* Always allocates a new buffer of the correct size. Copies header bytes * Always allocates a new buffer of the correct size. Copies header bytes
* from the existing buffer. Does not change header fields. * from the existing buffer. Does not change header fields.
* Reserve room to keep checksum bytes too. * Reserve room to keep checksum bytes too.
*
* @param extraBytes whether to reserve room in the buffer to read the next
* block's header
*/ */
private void allocateBuffer(boolean extraBytes) { private void allocateBuffer() {
int cksumBytes = totalChecksumBytes(); int cksumBytes = totalChecksumBytes();
int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader + int headerSize = headerSize();
cksumBytes + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
(extraBytes ? headerSize() : 0); cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
// Copy header bytes. // Copy header bytes.
System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(), System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
newBuf.arrayOffset(), headerSize()); newBuf.arrayOffset(), headerSize);
buf = newBuf; buf = newBuf;
buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); // set limit to exclude next block's header
buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
} }
/** An additional sanity-check in case no compression is being used. */ /**
* Return true when this block's buffer has been unpacked, false otherwise. Note this is a
* calculated heuristic, not tracked attribute of the block.
*/
public boolean isUnpacked() {
final int cksumBytes = totalChecksumBytes();
final int headerSize = headerSize();
final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
final int bufCapacity = buf.capacity();
return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
}
/** An additional sanity-check in case no compression or encryption is being used. */
public void assumeUncompressed() throws IOException { public void assumeUncompressed() throws IOException {
if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
totalChecksumBytes()) { totalChecksumBytes()) {
throw new IOException("Using no compression but " throw new IOException("Using no compression but "
+ "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
@ -512,7 +587,7 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* @return a byte stream reading the data section of this block * @return a byte stream reading the data + checksum of this block
*/ */
public DataInputStream getByteStream() { public DataInputStream getByteStream() {
return new DataInputStream(new ByteArrayInputStream(buf.array(), return new DataInputStream(new ByteArrayInputStream(buf.array(),
@ -588,7 +663,6 @@ public class HFileBlock implements Cacheable {
return nextBlockOnDiskSizeWithHeader; return nextBlockOnDiskSizeWithHeader;
} }
/** /**
* Unified version 2 {@link HFile} block writer. The intended usage pattern * Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows: * is as follows:
@ -631,7 +705,7 @@ public class HFileBlock implements Cacheable {
/** /**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be * Current block type. Set in {@link #startWriting(BlockType)}. Could be
* changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} * changed in {@link #finishBlock()} from {@link BlockType#DATA}
* to {@link BlockType#ENCODED_DATA}. * to {@link BlockType#ENCODED_DATA}.
*/ */
private BlockType blockType; private BlockType blockType;
@ -648,7 +722,7 @@ public class HFileBlock implements Cacheable {
/** /**
* Bytes to be written to the file system, including the header. Compressed * Bytes to be written to the file system, including the header. Compressed
* if compression is turned on. It also includes the checksum data that * if compression is turned on. It also includes the checksum data that
* immediately follows the block data. (header + data + checksums) * immediately follows the block data. (header + data + checksums)
*/ */
private byte[] onDiskBytesWithHeader; private byte[] onDiskBytesWithHeader;
@ -1008,6 +1082,19 @@ public class HFileBlock implements Cacheable {
return ByteBuffer.wrap(uncompressedBytesWithHeader); return ByteBuffer.wrap(uncompressedBytesWithHeader);
} }
/**
* Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is
* needed for storing packed blocks in the block cache. Expects calling semantics identical to
* {@link #getUncompressedBufferWithHeader()}. Returns only the header and data,
* Does not include checksum data.
*
* @return packed block bytes for caching on write
*/
ByteBuffer getOnDiskBufferWithHeader() {
expectState(State.BLOCK_READY);
return ByteBuffer.wrap(onDiskBytesWithHeader);
}
private void expectState(State expectedState) { private void expectState(State expectedState) {
if (state != expectedState) { if (state != expectedState) {
throw new IllegalStateException("Expected state: " + expectedState + throw new IllegalStateException("Expected state: " + expectedState +
@ -1038,7 +1125,7 @@ public class HFileBlock implements Cacheable {
* version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
* 0 value in bytesPerChecksum. * 0 value in bytesPerChecksum.
*/ */
public HFileBlock getBlockForCaching() { public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
HFileContext newContext = new HFileContextBuilder() HFileContext newContext = new HFileContextBuilder()
.withBlockSize(fileContext.getBlocksize()) .withBlockSize(fileContext.getBlocksize())
.withBytesPerCheckSum(0) .withBytesPerCheckSum(0)
@ -1051,7 +1138,10 @@ public class HFileBlock implements Cacheable {
.withIncludesTags(fileContext.isIncludesTags()) .withIncludesTags(fileContext.isIncludesTags())
.build(); .build();
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), getUncompressedSizeWithoutHeader(), prevOffset,
cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
getOnDiskBufferWithHeader() :
getUncompressedBufferWithHeader(),
DONT_FILL_HEADER, startOffset, DONT_FILL_HEADER, startOffset,
onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
} }
@ -1109,7 +1199,7 @@ public class HFileBlock implements Cacheable {
/** /**
* Creates a block iterator over the given portion of the {@link HFile}. * Creates a block iterator over the given portion of the {@link HFile}.
* The iterator returns blocks starting with offset such that offset <= * The iterator returns blocks starting with offset such that offset <=
* startOffset < endOffset. * startOffset < endOffset. Returned blocks are always unpacked.
* *
* @param startOffset the offset of the block to start iteration with * @param startOffset the offset of the block to start iteration with
* @param endOffset the offset to end iteration at (exclusive) * @param endOffset the offset to end iteration at (exclusive)
@ -1119,6 +1209,12 @@ public class HFileBlock implements Cacheable {
/** Closes the backing streams */ /** Closes the backing streams */
void closeStreams() throws IOException; void closeStreams() throws IOException;
/** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */
HFileBlockDecodingContext getBlockDecodingContext();
/** Get the default decoder for blocks from this file. */
HFileBlockDecodingContext getDefaultBlockDecodingContext();
} }
/** /**
@ -1159,6 +1255,7 @@ public class HFileBlock implements Cacheable {
@Override @Override
public BlockIterator blockRange(final long startOffset, public BlockIterator blockRange(final long startOffset,
final long endOffset) { final long endOffset) {
final FSReader owner = this; // handle for inner class
return new BlockIterator() { return new BlockIterator() {
private long offset = startOffset; private long offset = startOffset;
@ -1168,7 +1265,7 @@ public class HFileBlock implements Cacheable {
return null; return null;
HFileBlock b = readBlockData(offset, -1, -1, false); HFileBlock b = readBlockData(offset, -1, -1, false);
offset += b.getOnDiskSizeWithHeader(); offset += b.getOnDiskSizeWithHeader();
return b; return b.unpack(fileContext, owner);
} }
@Override @Override
@ -1274,7 +1371,8 @@ public class HFileBlock implements Cacheable {
private HFileBlockDecodingContext encodedBlockDecodingCtx; private HFileBlockDecodingContext encodedBlockDecodingCtx;
private HFileBlockDefaultDecodingContext defaultDecodingCtx; /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread = private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
new ThreadLocal<PrefetchedHeader>() { new ThreadLocal<PrefetchedHeader>() {
@ -1290,10 +1388,8 @@ public class HFileBlock implements Cacheable {
this.streamWrapper = stream; this.streamWrapper = stream;
// Older versions of HBase didn't support checksum. // Older versions of HBase didn't support checksum.
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
defaultDecodingCtx = defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
new HFileBlockDefaultDecodingContext(fileContext); encodedBlockDecodingCtx = defaultDecodingCtx;
encodedBlockDecodingCtx =
new HFileBlockDefaultDecodingContext(fileContext);
} }
/** /**
@ -1434,9 +1530,8 @@ public class HFileBlock implements Cacheable {
HFileBlock b = null; HFileBlock b = null;
if (onDiskSizeWithHeader > 0) { if (onDiskSizeWithHeader > 0) {
// We know the total on-disk size but not the uncompressed size. Read // We know the total on-disk size. Read the entire block into memory,
// the entire block into memory, then parse the header and decompress // then parse the header. This code path is used when
// from memory if using compression. This code path is used when
// doing a random read operation relying on the block index, as well as // doing a random read operation relying on the block index, as well as
// when the client knows the on-disk size from peeking into the next // when the client knows the on-disk size from peeking into the next
// block's header (e.g. this block's header) when reading the previous // block's header (e.g. this block's header) when reading the previous
@ -1444,7 +1539,8 @@ public class HFileBlock implements Cacheable {
// Size that we have to skip in case we have already read the header. // Size that we have to skip in case we have already read the header.
int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the
// next block's header
nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
true, offset + preReadHeaderSize, pread); true, offset + preReadHeaderSize, pread);
@ -1457,11 +1553,10 @@ public class HFileBlock implements Cacheable {
headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
} }
// We know the total on-disk size but not the uncompressed size. Read // We know the total on-disk size but not the uncompressed size. Read
// the entire block into memory, then parse the header and decompress // the entire block into memory, then parse the header. Here we have
// from memory if using compression. Here we have already read the // already read the block's header
// block's header
try { try {
b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
} catch (IOException ex) { } catch (IOException ex) {
// Seen in load testing. Provide comprehensive debug info. // Seen in load testing. Provide comprehensive debug info.
throw new IOException("Failed to read compressed block at " throw new IOException("Failed to read compressed block at "
@ -1499,66 +1594,34 @@ public class HFileBlock implements Cacheable {
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
hdrSize, false, offset, pread); hdrSize, false, offset, pread);
} }
b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
System.arraycopy(headerBuf.array(), System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
nextBlockOnDiskSize = nextBlockOnDiskSize =
readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
- hdrSize, true, offset + hdrSize, pread); - hdrSize, true, offset + hdrSize, pread);
onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
} }
Algorithm compressAlgo = fileContext.getCompression(); if (!fileContext.isCompressedOrEncrypted()) {
boolean isCompressed =
compressAlgo != null
&& compressAlgo != Compression.Algorithm.NONE;
Encryption.Context cryptoContext = fileContext.getEncryptionContext();
boolean isEncrypted = cryptoContext != null
&& cryptoContext != Encryption.Context.NONE;
if (!isCompressed && !isEncrypted) {
b.assumeUncompressed(); b.assumeUncompressed();
} }
if (verifyChecksum && if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
!validateBlockChecksum(b, onDiskBlock, hdrSize)) {
return null; // checksum mismatch return null; // checksum mismatch
} }
if (isCompressed || isEncrypted) { // The onDiskBlock will become the headerAndDataBuffer for this block.
// This will allocate a new buffer but keep header bytes. // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
b.allocateBuffer(nextBlockOnDiskSize > 0); // contains the header of next block, so no need to set next
if (b.blockType == BlockType.ENCODED_DATA) { // block's header in it.
encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, this.fileContext.isUseHBaseChecksum());
hdrSize);
} else {
defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock,
hdrSize);
}
if (nextBlockOnDiskSize > 0) {
// Copy next block's header bytes into the new block if we have them.
System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
b.buf.arrayOffset() + hdrSize
+ b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
hdrSize);
}
} else {
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next
// block's header in it.
b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0,
onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum());
}
b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
// Set prefetched header // Set prefetched header
if (b.nextBlockOnDiskSizeWithHeader > 0) { if (b.hasNextBlockHeader()) {
prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
System.arraycopy(onDiskBlock, onDiskSizeWithHeader, System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
prefetchedHeader.header, 0, hdrSize); prefetchedHeader.header, 0, hdrSize);
@ -1578,37 +1641,53 @@ public class HFileBlock implements Cacheable {
encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
} }
@Override
public HFileBlockDecodingContext getBlockDecodingContext() {
return this.encodedBlockDecodingCtx;
}
@Override
public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
return this.defaultDecodingCtx;
}
/** /**
* Generates the checksum for the header as well as the data and * Generates the checksum for the header as well as the data and
* then validates that it matches the value stored in the header. * then validates that it matches the value stored in the header.
* If there is a checksum mismatch, then return false. Otherwise * If there is a checksum mismatch, then return false. Otherwise
* return true. * return true.
*/ */
protected boolean validateBlockChecksum(HFileBlock block, protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize)
byte[] data, int hdrSize) throws IOException { throws IOException {
return ChecksumUtil.validateBlockChecksum(path, block, return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize);
data, hdrSize);
} }
@Override @Override
public void closeStreams() throws IOException { public void closeStreams() throws IOException {
streamWrapper.close(); streamWrapper.close();
} }
@Override
public String toString() {
return "FSReaderV2 [ hfs=" + hfs + " path=" + path + " fileContext=" + fileContext + " ]";
}
} }
@Override @Override
public int getSerializedLength() { public int getSerializedLength() {
if (buf != null) { if (buf != null) {
return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; // include extra bytes for the next header when it's available.
int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE;
} }
return 0; return 0;
} }
@Override @Override
public void serialize(ByteBuffer destination) { public void serialize(ByteBuffer destination) {
ByteBuffer dupBuf = this.buf.duplicate(); // assumes HeapByteBuffer
dupBuf.rewind(); destination.put(this.buf.array(), this.buf.arrayOffset(),
destination.put(dupBuf); getSerializedLength() - EXTRA_SERIALIZATION_SPACE);
serializeExtraInfo(destination); serializeExtraInfo(destination);
} }
@ -1656,13 +1735,9 @@ public class HFileBlock implements Cacheable {
if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
return false; return false;
} }
if (this.buf.compareTo(castedComparison.buf) != 0) { if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(), this.buf.limit(),
return false; castedComparison.buf.array(), castedComparison.buf.arrayOffset(),
} castedComparison.buf.limit()) != 0) {
if (this.buf.position() != castedComparison.buf.position()){
return false;
}
if (this.buf.limit() != castedComparison.buf.limit()){
return false; return false;
} }
return true; return true;
@ -1683,6 +1758,7 @@ public class HFileBlock implements Cacheable {
return this.fileContext.getBytesPerChecksum(); return this.fileContext.getBytesPerChecksum();
} }
/** @return the size of data on disk + header. Excludes checksum. */
int getOnDiskDataSizeWithHeader() { int getOnDiskDataSizeWithHeader() {
return this.onDiskDataSizeWithHeader; return this.onDiskDataSizeWithHeader;
} }
@ -1736,6 +1812,10 @@ public class HFileBlock implements Cacheable {
return DUMMY_HEADER_NO_CHECKSUM; return DUMMY_HEADER_NO_CHECKSUM;
} }
/**
* @return the HFileContext used to create this HFileBlock. Not necessary the
* fileContext for the file from which this block's data was originally read.
*/
public HFileContext getHFileContext() { public HFileContext getHFileContext() {
return this.fileContext; return this.fileContext;
} }
@ -1748,7 +1828,7 @@ public class HFileBlock implements Cacheable {
static String toStringHeader(ByteBuffer buf) throws IOException { static String toStringHeader(ByteBuffer buf) throws IOException {
int offset = buf.arrayOffset(); int offset = buf.arrayOffset();
byte[] b = buf.array(); byte[] b = buf.array();
long magic = Bytes.toLong(b, offset); long magic = Bytes.toLong(b, offset);
BlockType bt = BlockType.read(buf); BlockType bt = BlockType.read(buf);
offset += Bytes.SIZEOF_LONG; offset += Bytes.SIZEOF_LONG;
int compressedBlockSizeNoHeader = Bytes.toInt(b, offset); int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
@ -1775,4 +1855,3 @@ public class HFileBlock implements Cacheable {
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
} }
} }

View File

@ -772,7 +772,7 @@ public class HFileBlockIndex {
* {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
* initial value accounts for the root level, and will be increased to two * initial value accounts for the root level, and will be increased to two
* as soon as we find out there is a leaf-level in * as soon as we find out there is a leaf-level in
* {@link #blockWritten(long, int)}. * {@link #blockWritten(long, int, int)}.
*/ */
private int numLevels = 1; private int numLevels = 1;
@ -798,8 +798,8 @@ public class HFileBlockIndex {
/** Whether we require this block index to always be single-level. */ /** Whether we require this block index to always be single-level. */
private boolean singleLevelOnly; private boolean singleLevelOnly;
/** Block cache, or null if cache-on-write is disabled */ /** CacheConfig, or null if cache-on-write is disabled */
private BlockCache blockCache; private CacheConfig cacheConf;
/** Name to use for computing cache keys */ /** Name to use for computing cache keys */
private String nameForCaching; private String nameForCaching;
@ -814,18 +814,17 @@ public class HFileBlockIndex {
* Creates a multi-level block index writer. * Creates a multi-level block index writer.
* *
* @param blockWriter the block writer to use to write index blocks * @param blockWriter the block writer to use to write index blocks
* @param blockCache if this is not null, index blocks will be cached * @param cacheConf used to determine when and how a block should be cached-on-write.
* on write into this block cache.
*/ */
public BlockIndexWriter(HFileBlock.Writer blockWriter, public BlockIndexWriter(HFileBlock.Writer blockWriter,
BlockCache blockCache, String nameForCaching) { CacheConfig cacheConf, String nameForCaching) {
if ((blockCache == null) != (nameForCaching == null)) { if ((cacheConf == null) != (nameForCaching == null)) {
throw new IllegalArgumentException("Block cache and file name for " + throw new IllegalArgumentException("Block cache and file name for " +
"caching must be both specified or both null"); "caching must be both specified or both null");
} }
this.blockWriter = blockWriter; this.blockWriter = blockWriter;
this.blockCache = blockCache; this.cacheConf = cacheConf;
this.nameForCaching = nameForCaching; this.nameForCaching = nameForCaching;
this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
} }
@ -979,9 +978,9 @@ public class HFileBlockIndex {
byte[] curFirstKey = curChunk.getBlockKey(0); byte[] curFirstKey = curChunk.getBlockKey(0);
blockWriter.writeHeaderAndData(out); blockWriter.writeHeaderAndData(out);
if (blockCache != null) { if (cacheConf != null) {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(); HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
blockCache.cacheBlock(new BlockCacheKey(nameForCaching, cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
beginOffset), blockForCaching); beginOffset), blockForCaching);
} }
@ -1090,8 +1089,7 @@ public class HFileBlockIndex {
* entry referring to that block to the parent-level index. * entry referring to that block to the parent-level index.
*/ */
@Override @Override
public void blockWritten(long offset, int onDiskSize, int uncompressedSize) public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
{
// Add leaf index block size // Add leaf index block size
totalBlockOnDiskSize += onDiskSize; totalBlockOnDiskSize += onDiskSize;
totalBlockUncompressedSize += uncompressedSize; totalBlockUncompressedSize += uncompressedSize;
@ -1156,7 +1154,7 @@ public class HFileBlockIndex {
*/ */
@Override @Override
public boolean getCacheOnWrite() { public boolean getCacheOnWrite() {
return blockCache != null; return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
} }
/** /**

View File

@ -249,6 +249,10 @@ public class HFileReaderV2 extends AbstractHFileReader {
return new ScannerV2(this, cacheBlocks, pread, isCompaction); return new ScannerV2(this, cacheBlocks, pread, isCompaction);
} }
/**
* Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType}
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
*/
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException { DataBlockEncoding expectedDataBlockEncoding) throws IOException {
@ -258,6 +262,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
updateCacheMetrics); updateCacheMetrics);
if (cachedBlock != null) { if (cachedBlock != null) {
if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
}
validateBlockType(cachedBlock, expectedBlockType); validateBlockType(cachedBlock, expectedBlockType);
if (expectedDataBlockEncoding == null) { if (expectedDataBlockEncoding == null) {
@ -337,6 +344,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
BlockType.META, null); BlockType.META, null);
if (cachedBlock != null) { if (cachedBlock != null) {
assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block, // Return a distinct 'shallow copy' of the block,
// so pos does not get messed by the scanner // so pos does not get messed by the scanner
return cachedBlock.getBufferWithoutHeader(); return cachedBlock.getBufferWithoutHeader();
@ -345,7 +353,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
} }
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
blockSize, -1, true); blockSize, -1, true).unpack(hfileContext, fsBlockReader);
// Cache the block // Cache the block
if (cacheBlock) { if (cacheBlock) {
@ -359,7 +367,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
/** /**
* Read in a file block of the given {@link BlockType} and * Read in a file block of the given {@link BlockType} and
* {@link DataBlockEncoding}. * {@link DataBlockEncoding}. Unpacks the block as necessary.
* @param dataBlockOffset offset to read. * @param dataBlockOffset offset to read.
* @param onDiskBlockSize size of the block * @param onDiskBlockSize size of the block
* @param cacheBlock * @param cacheBlock
@ -400,8 +408,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
// the other choice is to duplicate work (which the cache would prevent you // the other choice is to duplicate work (which the cache would prevent you
// from doing). // from doing).
BlockCacheKey cacheKey = BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset);
new BlockCacheKey(name, dataBlockOffset);
boolean useLock = false; boolean useLock = false;
IdLock.Entry lockEntry = null; IdLock.Entry lockEntry = null;
@ -419,7 +426,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) { if (cachedBlock != null) {
validateBlockType(cachedBlock, expectedBlockType); assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) { if (cachedBlock.getBlockType().isData()) {
if (updateCacheMetrics) { if (updateCacheMetrics) {
HFile.dataBlockReadCnt.incrementAndGet(); HFile.dataBlockReadCnt.incrementAndGet();
@ -448,18 +455,21 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
pread); pread);
validateBlockType(hfileBlock, expectedBlockType); validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
// Cache the block if necessary // Cache the block if necessary
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheConf.getBlockCache().cacheBlock(cacheKey,
this.cacheConf.isCacheDataInL1()); cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
} }
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.dataBlockReadCnt.incrementAndGet(); HFile.dataBlockReadCnt.incrementAndGet();
} }
return hfileBlock; return unpacked;
} }
} finally { } finally {
traceScope.close(); traceScope.close();

View File

@ -118,7 +118,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
// Data block index writer // Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, cacheIndexesOnWrite ? cacheConf : null,
cacheIndexesOnWrite ? name : null); cacheIndexesOnWrite ? name : null);
dataBlockIndexWriter.setMaxChunkSize( dataBlockIndexWriter.setMaxChunkSize(
HFileBlockIndex.getMaxChunkSize(conf)); HFileBlockIndex.getMaxChunkSize(conf));
@ -143,7 +143,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
newBlock(); newBlock();
} }
/** Clean up the current block */ /** Clean up the current data block */
private void finishBlock() throws IOException { private void finishBlock() throws IOException {
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
return; return;
@ -191,7 +191,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
* the cache key. * the cache key.
*/ */
private void doCacheOnWrite(long offset) { private void doCacheOnWrite(long offset) {
HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
cacheConf.getBlockCache().cacheBlock( cacheConf.getBlockCache().cacheBlock(
new BlockCacheKey(name, offset), cacheFormatBlock); new BlockCacheKey(name, offset), cacheFormatBlock);
} }

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Objects;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -340,12 +341,35 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
long newSize = updateSizeMetrics(cb, false); long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb); map.put(cacheKey, cb);
elements.incrementAndGet(); long val = elements.incrementAndGet();
if (LOG.isTraceEnabled()) {
long size = map.size();
assertCounterSanity(size, val);
}
if (newSize > acceptableSize() && !evictionInProgress) { if (newSize > acceptableSize() && !evictionInProgress) {
runEviction(); runEviction();
} }
} }
/**
* Sanity-checking for parity between actual block cache content and metrics.
* Intended only for use with TRACE level logging and -ea JVM.
*/
private static void assertCounterSanity(long mapSize, long counterVal) {
if (counterVal < 0) {
LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
", mapSize=" + mapSize);
return;
}
if (mapSize < Integer.MAX_VALUE) {
double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
if (pct_diff > 0.05) {
LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
", mapSize=" + mapSize);
}
}
}
private int compare(Cacheable left, Cacheable right) { private int compare(Cacheable left, Cacheable right) {
ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength()); ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
left.serialize(l); left.serialize(l);
@ -459,7 +483,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
map.remove(block.getCacheKey()); map.remove(block.getCacheKey());
updateSizeMetrics(block, true); updateSizeMetrics(block, true);
elements.decrementAndGet(); long val = elements.decrementAndGet();
if (LOG.isTraceEnabled()) {
long size = map.size();
assertCounterSanity(size, val);
}
stats.evicted(block.getCachedTime()); stats.evicted(block.getCachedTime());
if (evictedByEvictionProcess && victimHandler != null) { if (evictedByEvictionProcess && victimHandler != null) {
boolean wait = getCurrentSize() < acceptableSize(); boolean wait = getCurrentSize() < acceptableSize();
@ -503,9 +531,12 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
if(bytesToFree <= 0) return; if(bytesToFree <= 0) return;
// Instantiate priority buckets // Instantiate priority buckets
BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); singleSize());
BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
multiSize());
BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
memorySize());
// Scan entire map putting into appropriate buckets // Scan entire map putting into appropriate buckets
for(LruCachedBlock cachedBlock : map.values()) { for(LruCachedBlock cachedBlock : map.values()) {
@ -534,7 +565,15 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
// so the single and multi buckets will be emptied // so the single and multi buckets will be emptied
bytesFreed = bucketSingle.free(s); bytesFreed = bucketSingle.free(s);
bytesFreed += bucketMulti.free(m); bytesFreed += bucketMulti.free(m);
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
" from single and multi buckets");
}
bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
" total from all three buckets ");
}
} else { } else {
// this means no need to evict block in memory bucket, // this means no need to evict block in memory bucket,
// and we try best to make the ratio between single-bucket and // and we try best to make the ratio between single-bucket and
@ -596,6 +635,23 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
} }
} }
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("blockCount", getBlockCount())
.add("currentSize", getCurrentSize())
.add("freeSize", getFreeSize())
.add("maxSize", getMaxSize())
.add("heapSize", heapSize())
.add("minSize", minSize())
.add("minFactor", minFactor)
.add("multiSize", multiSize())
.add("multiFactor", multiFactor)
.add("singleSize", singleSize())
.add("singleFactor", singleFactor)
.toString();
}
/** /**
* Used to group blocks into priority buckets. There will be a BlockBucket * Used to group blocks into priority buckets. There will be a BlockBucket
* for each priority (single, multi, memory). Once bucketed, the eviction * for each priority (single, multi, memory). Once bucketed, the eviction
@ -603,11 +659,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
* to configuration parameters and their relatives sizes. * to configuration parameters and their relatives sizes.
*/ */
private class BlockBucket implements Comparable<BlockBucket> { private class BlockBucket implements Comparable<BlockBucket> {
private final String name;
private LruCachedBlockQueue queue; private LruCachedBlockQueue queue;
private long totalSize = 0; private long totalSize = 0;
private long bucketSize; private long bucketSize;
public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
this.name = name;
this.bucketSize = bucketSize; this.bucketSize = bucketSize;
queue = new LruCachedBlockQueue(bytesToFree, blockSize); queue = new LruCachedBlockQueue(bytesToFree, blockSize);
totalSize = 0; totalSize = 0;
@ -619,6 +678,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
} }
public long free(long toFree) { public long free(long toFree) {
if (LOG.isTraceEnabled()) {
LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
}
LruCachedBlock cb; LruCachedBlock cb;
long freedBytes = 0; long freedBytes = 0;
while ((cb = queue.pollLast()) != null) { while ((cb = queue.pollLast()) != null) {
@ -627,6 +689,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return freedBytes; return freedBytes;
} }
} }
if (LOG.isTraceEnabled()) {
LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
}
return freedBytes; return freedBytes;
} }
@ -653,8 +718,16 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override @Override
public int hashCode() { public int hashCode() {
// Nothing distingushing about each instance unless I pass in a 'name' or something return Objects.hashCode(name, bucketSize, queue, totalSize);
return super.hashCode(); }
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("name", name)
.add("totalSize", StringUtils.byteDesc(totalSize))
.add("bucketSize", StringUtils.byteDesc(bucketSize))
.toString();
} }
} }
@ -769,6 +842,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
"freeSize=" + StringUtils.byteDesc(freeSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
"blockCount=" + getBlockCount() + ", " +
"accesses=" + stats.getRequestCount() + ", " + "accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " + "hits=" + stats.getHitCount() + ", " +
"hitRatio=" + (stats.getHitCount() == 0 ? "hitRatio=" + (stats.getHitCount() == 0 ?
@ -940,6 +1014,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
} }
/** Clears the cache. Used in tests. */ /** Clears the cache. Used in tests. */
@VisibleForTesting
public void clearCache() { public void clearCache() {
this.map.clear(); this.map.clear();
this.elements.set(0); this.elements.set(0);
@ -949,6 +1024,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
* Used in testing. May be very inefficient. * Used in testing. May be very inefficient.
* @return the set of cached file names * @return the set of cached file names
*/ */
@VisibleForTesting
SortedSet<String> getCachedFileNamesForTest() { SortedSet<String> getCachedFileNamesForTest() {
SortedSet<String> fileNames = new TreeSet<String>(); SortedSet<String> fileNames = new TreeSet<String>();
for (BlockCacheKey cacheKey : map.keySet()) { for (BlockCacheKey cacheKey : map.keySet()) {
@ -969,6 +1045,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return counts; return counts;
} }
@VisibleForTesting
public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
Map<DataBlockEncoding, Integer> counts = Map<DataBlockEncoding, Integer> counts =
new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class); new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
@ -986,6 +1063,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
victimHandler = handler; victimHandler = handler;
} }
@VisibleForTesting
Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
return map;
}
BucketCache getVictimHandler() { BucketCache getVictimHandler() {
return this.victimHandler; return this.victimHandler;
} }
@ -994,4 +1076,4 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
public BlockCache[] getBlockCaches() { public BlockCache[] getBlockCaches() {
return null; return null;
} }
} }

View File

@ -182,10 +182,9 @@ public class TestCacheConfig {
if (sizing) { if (sizing) {
long originalSize = bc.getCurrentSize(); long originalSize = bc.getCurrentSize();
bc.cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1()); bc.cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1());
long size = bc.getCurrentSize();
assertTrue(bc.getCurrentSize() > originalSize); assertTrue(bc.getCurrentSize() > originalSize);
bc.evictBlock(bck); bc.evictBlock(bck);
size = bc.getCurrentSize(); long size = bc.getCurrentSize();
assertEquals(originalSize, size); assertEquals(originalSize, size);
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -84,6 +85,7 @@ public class TestCacheOnWrite {
private final Compression.Algorithm compress; private final Compression.Algorithm compress;
private final BlockEncoderTestType encoderType; private final BlockEncoderTestType encoderType;
private final HFileDataBlockEncoder encoder; private final HFileDataBlockEncoder encoder;
private final boolean cacheCompressedData;
private static final int DATA_BLOCK_SIZE = 2048; private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 25000; private static final int NUM_KV = 25000;
@ -154,14 +156,15 @@ public class TestCacheOnWrite {
} }
} }
public TestCacheOnWrite(CacheOnWriteType cowType, public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress,
Compression.Algorithm compress, BlockEncoderTestType encoderType) { BlockEncoderTestType encoderType, boolean cacheCompressedData) {
this.cowType = cowType; this.cowType = cowType;
this.compress = compress; this.compress = compress;
this.encoderType = encoderType; this.encoderType = encoderType;
this.encoder = encoderType.getEncoder(); this.encoder = encoderType.getEncoder();
testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + this.cacheCompressedData = cacheCompressedData;
", encoderType=" + encoderType + "]"; testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]";
System.out.println(testDescription); System.out.println(testDescription);
} }
@ -173,7 +176,9 @@ public class TestCacheOnWrite {
HBaseTestingUtility.COMPRESSION_ALGORITHMS) { HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
for (BlockEncoderTestType encoderType : for (BlockEncoderTestType encoderType :
BlockEncoderTestType.values()) { BlockEncoderTestType.values()) {
cowTypes.add(new Object[] { cowType, compress, encoderType }); for (boolean cacheCompressedData : new boolean[] { false, true }) {
cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData });
}
} }
} }
} }
@ -189,11 +194,12 @@ public class TestCacheOnWrite {
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
BLOOM_BLOCK_SIZE); BLOOM_BLOCK_SIZE);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.DATA)); cowType.shouldBeCached(BlockType.DATA));
conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.LEAF_INDEX)); cowType.shouldBeCached(BlockType.LEAF_INDEX));
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
cowType.modifyConf(conf); cowType.modifyConf(conf);
fs = HFileSystem.get(conf); fs = HFileSystem.get(conf);
cacheConf = new CacheConfig(conf); cacheConf = new CacheConfig(conf);
@ -225,6 +231,10 @@ public class TestCacheOnWrite {
reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
} }
LOG.info("HFile information: " + reader); LOG.info("HFile information: " + reader);
HFileContext meta = new HFileContextBuilder().withCompression(compress)
.withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
.withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
.withIncludesTags(useTags).build();
final boolean cacheBlocks = false; final boolean cacheBlocks = false;
final boolean pread = false; final boolean pread = false;
HFileScanner scanner = reader.getScanner(cacheBlocks, pread); HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
@ -248,16 +258,36 @@ public class TestCacheOnWrite {
false, true, null, encodingInCache); false, true, null, encodingInCache);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset); offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true);
boolean isCached = fromCache != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
if (shouldBeCached != isCached) { assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
throw new AssertionError( "isCached: " + isCached + "\n" +
"shouldBeCached: " + shouldBeCached+ "\n" + "Test description: " + testDescription + "\n" +
"isCached: " + isCached + "\n" + "block: " + block + "\n" +
"Test description: " + testDescription + "\n" + "encodingInCache: " + encodingInCache + "\n" +
"block: " + block + "\n" + "blockCacheKey: " + blockCacheKey,
"encodingInCache: " + encodingInCache + "\n" + shouldBeCached == isCached);
"blockCacheKey: " + blockCacheKey); if (isCached) {
if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
if (compress != Compression.Algorithm.NONE) {
assertFalse(fromCache.isUnpacked());
}
fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
} else {
assertTrue(fromCache.isUnpacked());
}
// block we cached at write-time and block read from file should be identical
assertEquals(block.getChecksumType(), fromCache.getChecksumType());
assertEquals(block.getBlockType(), fromCache.getBlockType());
if (block.getBlockType() == BlockType.ENCODED_DATA) {
assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId());
assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding());
}
assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader());
assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader());
assertEquals(
block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader());
} }
prevBlock = block; prevBlock = block;
offset += block.getOnDiskSizeWithHeader(); offset += block.getOnDiskSizeWithHeader();

View File

@ -124,7 +124,7 @@ public class TestChecksum {
assertEquals(algo == GZ ? 2173 : 4936, assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
// read data back from the hfile, exclude header and checksum // read data back from the hfile, exclude header and checksum
ByteBuffer bb = b.getBufferWithoutHeader(); // read back data ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
DataInputStream in = new DataInputStream( DataInputStream in = new DataInputStream(
new ByteArrayInputStream( new ByteArrayInputStream(
bb.array(), bb.arrayOffset(), bb.limit())); bb.array(), bb.arrayOffset(), bb.limit()));
@ -163,6 +163,7 @@ public class TestChecksum {
b = hbr.readBlockData(0, -1, -1, pread); b = hbr.readBlockData(0, -1, -1, pread);
is.close(); is.close();
b.sanityCheck(); b.sanityCheck();
b = b.unpack(meta, hbr);
assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936, assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
@ -272,12 +273,7 @@ public class TestChecksum {
// validate data // validate data
for (int i = 0; i < 1234; i++) { for (int i = 0; i < 1234; i++) {
int val = in.readInt(); int val = in.readInt();
if (val != i) { assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
String msg = "testChecksumCorruption: data mismatch at index " +
i + " expected " + i + " found " + val;
LOG.warn(msg);
assertEquals(i, val);
}
} }
} }

View File

@ -80,14 +80,15 @@ public class TestForceCacheImportantBlocks {
@Parameters @Parameters
public static Collection<Object[]> parameters() { public static Collection<Object[]> parameters() {
// HFile versions // HFile versions
return Arrays.asList(new Object[][] { return Arrays.asList(
new Object[] { new Integer(2), false }, new Object[] { 2, true },
new Object[] { new Integer(3), true } new Object[] { 2, false },
}); new Object[] { 3, true },
new Object[] { 3, false }
);
} }
public TestForceCacheImportantBlocks(int hfileVersion, public TestForceCacheImportantBlocks(int hfileVersion, boolean cfCacheEnabled) {
boolean cfCacheEnabled) {
this.hfileVersion = hfileVersion; this.hfileVersion = hfileVersion;
this.cfCacheEnabled = cfCacheEnabled; this.cfCacheEnabled = cfCacheEnabled;
TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, hfileVersion); TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, hfileVersion);
@ -110,9 +111,9 @@ public class TestForceCacheImportantBlocks {
hcd.setBlocksize(BLOCK_SIZE); hcd.setBlocksize(BLOCK_SIZE);
hcd.setBlockCacheEnabled(cfCacheEnabled); hcd.setBlockCacheEnabled(cfCacheEnabled);
HRegion region = TEST_UTIL.createTestRegion(TABLE, hcd); HRegion region = TEST_UTIL.createTestRegion(TABLE, hcd);
BlockCache cache = region.getStore(hcd.getName()).getCacheConfig().getBlockCache();
CacheStats stats = cache.getStats();
writeTestData(region); writeTestData(region);
CacheStats stats =
region.getStores().get(hcd.getName()).getCacheConfig().getBlockCache().getStats();
assertEquals(0, stats.getHitCount()); assertEquals(0, stats.getHitCount());
assertEquals(0, HFile.dataBlockReadCnt.get()); assertEquals(0, HFile.dataBlockReadCnt.get());
// Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss // Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss
@ -141,4 +142,4 @@ public class TestForceCacheImportantBlocks {
} }
} }
} }
} }

View File

@ -316,7 +316,8 @@ public class TestHFile extends HBaseTestCase {
ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false);
ByteBuffer expected = ByteBuffer expected =
ByteBuffer.wrap(("something to test" + i).getBytes()); ByteBuffer.wrap(("something to test" + i).getBytes());
assertTrue("failed to match metadata", actual.compareTo(expected) == 0); assertEquals("failed to match metadata",
Bytes.toStringBinary(expected), Bytes.toStringBinary(actual));
} }
} }

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -69,6 +67,7 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
@Category(MediumTests.class) @Category(MediumTests.class)
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@ -234,8 +233,14 @@ public class TestHFileBlock {
@Test @Test
public void testNoCompression() throws IOException { public void testNoCompression() throws IOException {
assertEquals(4000, createTestV2Block(NONE, includesMemstoreTS, false). CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
getBlockForCaching().getUncompressedSizeWithoutHeader()); Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
HFileBlock block =
createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
assertEquals(4000, block.getUncompressedSizeWithoutHeader());
assertEquals(4004, block.getOnDiskSizeWithoutHeader());
assertTrue(block.isUnpacked());
} }
@Test @Test
@ -316,14 +321,14 @@ public class TestHFileBlock {
assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936, assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
String blockStr = b.toString(); HFileBlock expected = b;
if (algo == GZ) { if (algo == GZ) {
is = fs.open(path); is = fs.open(path);
hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
b.totalChecksumBytes(), -1, pread); b.totalChecksumBytes(), -1, pread);
assertEquals(blockStr, b.toString()); assertEquals(expected, b);
int wrongCompressedSize = 2172; int wrongCompressedSize = 2172;
try { try {
b = hbr.readBlockData(0, wrongCompressedSize b = hbr.readBlockData(0, wrongCompressedSize
@ -409,20 +414,35 @@ public class TestHFileBlock {
HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta); HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, meta);
hbr.setDataBlockEncoder(dataBlockEncoder); hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemstoreTS(includesMemstoreTS); hbr.setIncludesMemstoreTS(includesMemstoreTS);
HFileBlock b; HFileBlock blockFromHFile, blockUnpacked;
int pos = 0; int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) { for (int blockId = 0; blockId < numBlocks; ++blockId) {
b = hbr.readBlockData(pos, -1, -1, pread); blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
assertEquals(0, HFile.getChecksumFailuresCount()); assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck(); blockFromHFile.sanityCheck();
pos += b.getOnDiskSizeWithHeader(); pos += blockFromHFile.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId), assertEquals((int) encodedSizes.get(blockId),
b.getUncompressedSizeWithoutHeader()); blockFromHFile.getUncompressedSizeWithoutHeader());
ByteBuffer actualBuffer = b.getBufferWithoutHeader(); assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
long packedHeapsize = blockFromHFile.heapSize();
blockUnpacked = blockFromHFile.unpack(meta, hbr);
assertTrue(blockUnpacked.isUnpacked());
if (meta.isCompressedOrEncrypted()) {
LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
.heapSize());
assertFalse(packedHeapsize == blockUnpacked.heapSize());
assertTrue("Packed heapSize should be < unpacked heapSize",
packedHeapsize < blockUnpacked.heapSize());
}
ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
if (encoding != DataBlockEncoding.NONE) { if (encoding != DataBlockEncoding.NONE) {
// We expect a two-byte big-endian encoding id. // We expect a two-byte big-endian encoding id.
assertEquals(0, actualBuffer.get(0)); assertEquals(
assertEquals(encoding.getId(), actualBuffer.get(1)); "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
assertEquals(
"Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
actualBuffer.position(2); actualBuffer.position(2);
actualBuffer = actualBuffer.slice(); actualBuffer = actualBuffer.slice();
} }
@ -432,6 +452,22 @@ public class TestHFileBlock {
// test if content matches, produce nice message // test if content matches, produce nice message
assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread); assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
// test serialized blocks
for (boolean reuseBuffer : new boolean[] { false, true }) {
ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
blockFromHFile.serialize(serialized);
HFileBlock deserialized =
(HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
assertEquals(
"Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
blockFromHFile, deserialized);
// intentional reference comparison
if (blockFromHFile != blockUnpacked) {
assertEquals("Deserializaed block cannot be unpacked correctly.",
blockUnpacked, deserialized.unpack(meta, hbr));
}
}
} }
is.close(); is.close();
} }
@ -439,6 +475,11 @@ public class TestHFileBlock {
} }
} }
static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
boolean pread) {
return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
}
static void assertBuffersEqual(ByteBuffer expectedBuffer, static void assertBuffersEqual(ByteBuffer expectedBuffer,
ByteBuffer actualBuffer, Compression.Algorithm compression, ByteBuffer actualBuffer, Compression.Algorithm compression,
DataBlockEncoding encoding, boolean pread) { DataBlockEncoding encoding, boolean pread) {
@ -451,9 +492,8 @@ public class TestHFileBlock {
} }
fail(String.format( fail(String.format(
"Content mismath for compression %s, encoding %s, " + "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
"pread %s, commonPrefix %d, expected %s, got %s", buildMessageDetails(compression, encoding, pread), prefix,
compression, encoding, pread, prefix,
nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(expectedBuffer, prefix),
nextBytesToStr(actualBuffer, prefix))); nextBytesToStr(actualBuffer, prefix)));
} }
@ -476,6 +516,7 @@ public class TestHFileBlock {
} }
protected void testPreviousOffsetInternals() throws IOException { protected void testPreviousOffsetInternals() throws IOException {
// TODO: parameterize these nested loops.
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) { for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : BOOLEAN_VALUES) { for (boolean pread : BOOLEAN_VALUES) {
for (boolean cacheOnWrite : BOOLEAN_VALUES) { for (boolean cacheOnWrite : BOOLEAN_VALUES) {
@ -545,8 +586,10 @@ public class TestHFileBlock {
curOffset += b.getOnDiskSizeWithHeader(); curOffset += b.getOnDiskSizeWithHeader();
if (cacheOnWrite) { if (cacheOnWrite) {
// In the cache-on-write mode we store uncompressed bytes so we // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
// can compare them to what was read by the block reader. // verifies that the unpacked value read back off disk matches the unpacked value
// generated before writing to disk.
b = b.unpack(meta, hbr);
// b's buffer has header + data + checksum while // b's buffer has header + data + checksum while
// expectedContents have header + data only // expectedContents have header + data only
ByteBuffer bufRead = b.getBufferWithHeader(); ByteBuffer bufRead = b.getBufferWithHeader();
@ -565,11 +608,10 @@ public class TestHFileBlock {
+ algo + ", pread=" + pread + algo + ", pread=" + pread
+ ", cacheOnWrite=" + cacheOnWrite + "):\n"; + ", cacheOnWrite=" + cacheOnWrite + "):\n";
wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(), wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
bufExpected.arrayOffset(), Math.min(32, bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
bufExpected.limit()))
+ ", actual:\n" + ", actual:\n"
+ Bytes.toStringBinary(bufRead.array(), + Bytes.toStringBinary(bufRead.array(),
bufRead.arrayOffset(), Math.min(32, bufRead.limit())); bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
if (detailedLogging) { if (detailedLogging) {
LOG.warn("expected header" + LOG.warn("expected header" +
HFileBlock.toStringHeader(bufExpected) + HFileBlock.toStringHeader(bufExpected) +
@ -758,6 +800,7 @@ public class TestHFileBlock {
if (detailedLogging) { if (detailedLogging) {
LOG.info("Written block #" + i + " of type " + bt LOG.info("Written block #" + i + " of type " + bt
+ ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader() + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
+ ", packed size " + hbw.getOnDiskSizeWithoutHeader()
+ " at offset " + pos); + " at offset " + pos);
} }
} }
@ -806,7 +849,4 @@ public class TestHFileBlock {
block.heapSize()); block.heapSize());
} }
} }
} }

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE; import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -205,7 +203,7 @@ public class TestHFileBlockCompatibility {
assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936, assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
String blockStr = b.toString(); HFileBlock expected = b;
if (algo == GZ) { if (algo == GZ) {
is = fs.open(path); is = fs.open(path);
@ -213,7 +211,7 @@ public class TestHFileBlockCompatibility {
meta); meta);
b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
b.totalChecksumBytes(), -1, pread); b.totalChecksumBytes(), -1, pread);
assertEquals(blockStr, b.toString()); assertEquals(expected, b);
int wrongCompressedSize = 2172; int wrongCompressedSize = 2172;
try { try {
b = hbr.readBlockData(0, wrongCompressedSize b = hbr.readBlockData(0, wrongCompressedSize
@ -301,6 +299,10 @@ public class TestHFileBlockCompatibility {
for (int blockId = 0; blockId < numBlocks; ++blockId) { for (int blockId = 0; blockId < numBlocks; ++blockId) {
b = hbr.readBlockData(pos, -1, -1, pread); b = hbr.readBlockData(pos, -1, -1, pread);
b.sanityCheck(); b.sanityCheck();
if (meta.isCompressedOrEncrypted()) {
assertFalse(b.isUnpacked());
b = b.unpack(meta, hbr);
}
pos += b.getOnDiskSizeWithHeader(); pos += b.getOnDiskSizeWithHeader();
assertEquals((int) encodedSizes.get(blockId), assertEquals((int) encodedSizes.get(blockId),
@ -335,7 +337,7 @@ public class TestHFileBlockCompatibility {
* in this class but the code in HFileBlock.Writer will continually * in this class but the code in HFileBlock.Writer will continually
* evolve. * evolve.
*/ */
public static final class Writer extends HFileBlock.Writer{ public static final class Writer extends HFileBlock.Writer {
// These constants are as they were in minorVersion 0. // These constants are as they were in minorVersion 0.
private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
@ -416,10 +418,6 @@ public class TestHFileBlockCompatibility {
private int unencodedDataSizeWritten; private int unencodedDataSizeWritten;
/**
* @param compressionAlgorithm compression algorithm to use
* @param dataBlockEncoderAlgo data block encoding algorithm to use
*/
public Writer(Compression.Algorithm compressionAlgorithm, public Writer(Compression.Algorithm compressionAlgorithm,
HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) { HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false) this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)

View File

@ -17,11 +17,6 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -53,6 +48,8 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestHFileEncryption { public class TestHFileEncryption {
private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class); private static final Log LOG = LogFactory.getLog(TestHFileEncryption.class);
@ -95,11 +92,13 @@ public class TestHFileEncryption {
return hbw.getOnDiskSizeWithHeader(); return hbw.getOnDiskSizeWithHeader();
} }
private long readAndVerifyBlock(long pos, HFileBlock.FSReaderV2 hbr, int size) private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderV2 hbr, int size)
throws IOException { throws IOException {
HFileBlock b = hbr.readBlockData(pos, -1, -1, false); HFileBlock b = hbr.readBlockData(pos, -1, -1, false);
assertEquals(0, HFile.getChecksumFailuresCount()); assertEquals(0, HFile.getChecksumFailuresCount());
b.sanityCheck(); b.sanityCheck();
assertFalse(b.isUnpacked());
b = b.unpack(ctx, hbr);
LOG.info("Read a block at " + pos + " with" + LOG.info("Read a block at " + pos + " with" +
" onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() + " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() +
" uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() + " uncompressedSizeWithoutHeader=" + b.getOnDiskSizeWithoutHeader() +
@ -142,7 +141,7 @@ public class TestHFileEncryption {
HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, fileContext); HFileBlock.FSReaderV2 hbr = new HFileBlock.FSReaderV2(is, totalSize, fileContext);
long pos = 0; long pos = 0;
for (int i = 0; i < blocks; i++) { for (int i = 0; i < blocks; i++) {
pos += readAndVerifyBlock(pos, hbr, blockSizes[i]); pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]);
} }
} finally { } finally {
is.close(); is.close();

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -170,8 +171,8 @@ public class TestHFileWriterV2 {
// Meta index. // Meta index.
metaBlockIndexReader.readRootIndex( metaBlockIndexReader.readRootIndex(
blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
trailer.getMetaIndexCount()); .getByteStream(), trailer.getMetaIndexCount());
// File info // File info
FileInfo fileInfo = new FileInfo(); FileInfo fileInfo = new FileInfo();
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
@ -191,6 +192,10 @@ public class TestHFileWriterV2 {
while (curBlockPos <= trailer.getLastDataBlockOffset()) { while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false);
assertEquals(BlockType.DATA, block.getBlockType()); assertEquals(BlockType.DATA, block.getBlockType());
if (meta.isCompressedOrEncrypted()) {
assertFalse(block.isUnpacked());
block = block.unpack(meta, blockReader);
}
ByteBuffer buf = block.getBufferWithoutHeader(); ByteBuffer buf = block.getBufferWithoutHeader();
while (buf.hasRemaining()) { while (buf.hasRemaining()) {
int keyLen = buf.getInt(); int keyLen = buf.getInt();
@ -232,7 +237,8 @@ public class TestHFileWriterV2 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset()); trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
.unpack(meta, blockReader);
assertEquals(BlockType.META, block.getBlockType()); assertEquals(BlockType.META, block.getBlockType());
Text t = new Text(); Text t = new Text();
ByteBuffer buf = block.getBufferWithoutHeader(); ByteBuffer buf = block.getBufferWithoutHeader();

View File

@ -191,8 +191,7 @@ public class TestHFileWriterV3 {
// Data index. We also read statistics about the block index written after // Data index. We also read statistics about the block index written after
// the root level. // the root level.
dataBlockIndexReader.readMultiLevelIndexRoot( dataBlockIndexReader.readMultiLevelIndexRoot(
blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), trailer.getDataIndexCount());
trailer.getDataIndexCount());
if (findMidKey) { if (findMidKey) {
byte[] midkey = dataBlockIndexReader.midkey(); byte[] midkey = dataBlockIndexReader.midkey();
@ -201,8 +200,8 @@ public class TestHFileWriterV3 {
// Meta index. // Meta index.
metaBlockIndexReader.readRootIndex( metaBlockIndexReader.readRootIndex(
blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX).getByteStream(), blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX)
trailer.getMetaIndexCount()); .getByteStream(), trailer.getMetaIndexCount());
// File info // File info
FileInfo fileInfo = new FileInfo(); FileInfo fileInfo = new FileInfo();
fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream()); fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
@ -220,7 +219,8 @@ public class TestHFileWriterV3 {
fsdis.seek(0); fsdis.seek(0);
long curBlockPos = 0; long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) { while (curBlockPos <= trailer.getLastDataBlockOffset()) {
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
.unpack(context, blockReader);
assertEquals(BlockType.DATA, block.getBlockType()); assertEquals(BlockType.DATA, block.getBlockType());
ByteBuffer buf = block.getBufferWithoutHeader(); ByteBuffer buf = block.getBufferWithoutHeader();
int keyLen = -1; int keyLen = -1;
@ -278,7 +278,8 @@ public class TestHFileWriterV3 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset()); trailer.getLoadOnOpenDataOffset());
HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false); HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
.unpack(context, blockReader);
assertEquals(BlockType.META, block.getBlockType()); assertEquals(BlockType.META, block.getBlockType());
Text t = new Text(); Text t = new Text();
ByteBuffer buf = block.getBufferWithoutHeader(); ByteBuffer buf = block.getBufferWithoutHeader();

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import com.google.common.collect.Iterables;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.junit.Assert.*;
/**
* A kind of integration test at the intersection of {@link HFileBlock}, {@link CacheConfig},
* and {@link LruBlockCache}.
*/
@Category(SmallTests.class)
@RunWith(Parameterized.class)
public class TestLazyDataBlockDecompression {
private static final Log LOG = LogFactory.getLog(TestLazyDataBlockDecompression.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private FileSystem fs;
@Parameterized.Parameter(0)
public boolean cacheOnWrite;
@Parameterized.Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ false },
{ true }
});
}
@Before
public void setUp() throws IOException {
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
fs = FileSystem.get(TEST_UTIL.getConfiguration());
}
@After
public void tearDown() {
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE = null;
fs = null;
}
/**
* Write {@code entryCount} random keyvalues to a new HFile at {@code path}. Returns the row
* bytes of the KeyValues written, in the order they were written.
*/
private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
HFileContext cxt, int entryCount) throws IOException {
HFileWriterV2 writer = (HFileWriterV2)
new HFileWriterV2.WriterFactoryV2(conf, cc)
.withPath(fs, path)
.withFileContext(cxt)
.create();
// write a bunch of random kv's
Random rand = new Random(9713312); // some seed.
final byte[] family = Bytes.toBytes("f");
final byte[] qualifier = Bytes.toBytes("q");
for (int i = 0; i < entryCount; i++) {
byte[] keyBytes = TestHFileWriterV2.randomOrderedKey(rand, i);
byte[] valueBytes = TestHFileWriterV2.randomValue(rand);
// make a real keyvalue so that hfile tool can examine it
writer.append(new KeyValue(keyBytes, family, qualifier, valueBytes));
}
writer.close();
}
/**
* Read all blocks from {@code path} to populate {@code blockCache}.
*/
private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs,
Path path, HFileContext cxt) throws IOException {
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
long fileSize = fs.getFileStatus(path).getLen();
FixedFileTrailer trailer =
FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig,
fsdis.getHfs(), conf);
reader.loadFileInfo();
long offset = trailer.getFirstDataBlockOffset(),
max = trailer.getLastDataBlockOffset();
List<HFileBlock> blocks = new ArrayList<HFileBlock>(4);
HFileBlock block;
while (offset <= max) {
block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false,
/* isCompaction */ false, /* updateCacheMetrics */ true, null, null);
offset += block.getOnDiskSizeWithHeader();
blocks.add(block);
}
LOG.info("read " + Iterables.toString(blocks));
}
@Test
public void testCompressionIncreasesEffectiveBlockCacheSize() throws Exception {
// enough room for 2 uncompressed block
int maxSize = (int) (HConstants.DEFAULT_BLOCKSIZE * 2.1);
Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
"testCompressionIncreasesEffectiveBlockcacheSize");
HFileContext context = new HFileContextBuilder()
.withCompression(Compression.Algorithm.GZ)
.build();
LOG.info("context=" + context);
// setup cache with lazy-decompression disabled.
Configuration lazyCompressDisabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled);
CacheConfig cc = new CacheConfig(lazyCompressDisabled);
assertFalse(cc.shouldCacheDataCompressed());
assertTrue(cc.getBlockCache() instanceof LruBlockCache);
LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache();
LOG.info("disabledBlockCache=" + disabledBlockCache);
assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
assertTrue("eviction thread spawned unintentionally.",
disabledBlockCache.getEvictionThread() == null);
assertEquals("freshly created blockcache contains blocks.",
0, disabledBlockCache.getBlockCount());
// 2000 kv's is ~3.6 full unencoded data blocks.
// Requires a conf and CacheConfig but should not be specific to this instance's cache settings
writeHFile(lazyCompressDisabled, cc, fs, hfilePath, context, 2000);
// populate the cache
cacheBlocks(lazyCompressDisabled, cc, fs, hfilePath, context);
long disabledBlockCount = disabledBlockCache.getBlockCount();
assertTrue("blockcache should contain blocks. disabledBlockCount=" + disabledBlockCount,
disabledBlockCount > 0);
long disabledEvictedCount = disabledBlockCache.getStats().getEvictedCount();
for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
disabledBlockCache.getMapForTests().entrySet()) {
HFileBlock block = (HFileBlock) e.getValue().getBuffer();
assertTrue("found a packed block, block=" + block, block.isUnpacked());
}
// count blocks with lazy decompression
Configuration lazyCompressEnabled = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled);
cc = new CacheConfig(lazyCompressEnabled);
assertTrue("test improperly configured.", cc.shouldCacheDataCompressed());
assertTrue(cc.getBlockCache() instanceof LruBlockCache);
LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache();
LOG.info("enabledBlockCache=" + enabledBlockCache);
assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize());
assertTrue("eviction thread spawned unintentionally.",
enabledBlockCache.getEvictionThread() == null);
assertEquals("freshly created blockcache contains blocks.",
0, enabledBlockCache.getBlockCount());
cacheBlocks(lazyCompressEnabled, cc, fs, hfilePath, context);
long enabledBlockCount = enabledBlockCache.getBlockCount();
assertTrue("blockcache should contain blocks. enabledBlockCount=" + enabledBlockCount,
enabledBlockCount > 0);
long enabledEvictedCount = enabledBlockCache.getStats().getEvictedCount();
int candidatesFound = 0;
for (Map.Entry<BlockCacheKey, LruCachedBlock> e :
enabledBlockCache.getMapForTests().entrySet()) {
candidatesFound++;
HFileBlock block = (HFileBlock) e.getValue().getBuffer();
if (cc.shouldCacheCompressed(block.getBlockType().getCategory())) {
assertFalse("found an unpacked block, block=" + block + ", block buffer capacity=" +
block.getBufferWithoutHeader().capacity(), block.isUnpacked());
}
}
assertTrue("did not find any candidates for compressed caching. Invalid test.",
candidatesFound > 0);
LOG.info("disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
enabledBlockCount);
assertTrue("enabling compressed data blocks should increase the effective cache size. " +
"disabledBlockCount=" + disabledBlockCount + ", enabledBlockCount=" +
enabledBlockCount, disabledBlockCount < enabledBlockCount);
LOG.info("disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
enabledEvictedCount);
assertTrue("enabling compressed data blocks should reduce the number of evictions. " +
"disabledEvictedCount=" + disabledEvictedCount + ", enabledEvictedCount=" +
enabledEvictedCount, enabledEvictedCount < disabledEvictedCount);
}
}