From 58818496daad0572843eacbeabfb95bc6af816ee Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 22 May 2014 10:17:39 -0700 Subject: [PATCH] HBASE-9857 Blockcache prefetch option --- .../hadoop/hbase/HColumnDescriptor.java | 32 ++++ .../hadoop/hbase/io/hfile/BlockCache.java | 5 +- .../hadoop/hbase/io/hfile/CacheConfig.java | 41 ++++- .../hbase/io/hfile/CombinedBlockCache.java | 8 +- .../hbase/io/hfile/DoubleBlockCache.java | 15 +- .../apache/hadoop/hbase/io/hfile/HFile.java | 3 +- .../hbase/io/hfile/HFileBlockIndex.java | 4 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 78 +++++++-- .../hadoop/hbase/io/hfile/HFileReaderV3.java | 2 +- .../hadoop/hbase/io/hfile/LruBlockCache.java | 14 +- .../hbase/io/hfile/PrefetchExecutor.java | 122 +++++++++++++++ .../hbase/io/hfile/bucket/BucketCache.java | 16 +- .../hbase/io/hfile/slab/SingleSizeCache.java | 7 +- .../hadoop/hbase/io/hfile/slab/SlabCache.java | 9 +- .../hbase/util/CompoundBloomFilter.java | 2 +- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 14 +- .../hbase/io/hfile/TestCacheOnWrite.java | 4 +- .../hbase/io/hfile/TestHFileBlockIndex.java | 3 +- .../io/hfile/TestHFileDataBlockEncoder.java | 2 +- .../hbase/io/hfile/TestLruBlockCache.java | 98 ++++++------ .../hadoop/hbase/io/hfile/TestPrefetch.java | 148 ++++++++++++++++++ .../io/hfile/bucket/TestBucketCache.java | 4 +- .../TestCacheOnWriteInSchema.java | 4 +- .../regionserver/TestHeapMemoryManager.java | 3 +- hbase-shell/src/main/ruby/hbase/admin.rb | 1 + 25 files changed, 517 insertions(+), 122 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 33412bc87b7..0b7c382400e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -88,6 +88,13 @@ public class HColumnDescriptor implements WritableComparable public static final String CACHE_INDEX_ON_WRITE = "CACHE_INDEX_ON_WRITE"; public static final String CACHE_BLOOMS_ON_WRITE = "CACHE_BLOOMS_ON_WRITE"; public static final String EVICT_BLOCKS_ON_CLOSE = "EVICT_BLOCKS_ON_CLOSE"; + /** + * Key for the PREFETCH_BLOCKS_ON_OPEN attribute. + * If set, all INDEX, BLOOM, and DATA blocks of HFiles belonging to this + * family will be loaded into the cache as soon as the file is opened. These + * loads will not count as cache misses. + */ + public static final String PREFETCH_BLOCKS_ON_OPEN = "PREFETCH_BLOCKS_ON_OPEN"; /** * Size of storefile/hfile 'blocks'. Default is {@link #DEFAULT_BLOCKSIZE}. @@ -207,6 +214,11 @@ public class HColumnDescriptor implements WritableComparable */ public static final boolean DEFAULT_COMPRESS_TAGS = true; + /* + * Default setting for whether to prefetch blocks into the blockcache on open. + */ + public static final boolean DEFAULT_PREFETCH_BLOCKS_ON_OPEN = false; + private final static Map DEFAULT_VALUES = new HashMap(); private final static Set RESERVED_KEYWORDS @@ -227,6 +239,7 @@ public class HColumnDescriptor implements WritableComparable DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE)); DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE)); DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE)); + DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN)); for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); } @@ -933,6 +946,25 @@ public class HColumnDescriptor implements WritableComparable return setValue(EVICT_BLOCKS_ON_CLOSE, Boolean.toString(value)); } + /** + * @return true if we should prefetch blocks into the blockcache on open + */ + public boolean shouldPrefetchBlocksOnOpen() { + String value = getValue(PREFETCH_BLOCKS_ON_OPEN); + if (value != null) { + return Boolean.valueOf(value).booleanValue(); + } + return DEFAULT_PREFETCH_BLOCKS_ON_OPEN; + } + + /** + * @param value true if we should prefetch blocks into the blockcache on open + * @return this (for chained invocation) + */ + public HColumnDescriptor setPrefetchBlocksOnOpen(boolean value) { + return setValue(PREFETCH_BLOCKS_ON_OPEN, Boolean.toString(value)); + } + /** * @see java.lang.Object#toString() */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 25ff76b4b29..597de525b1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -51,10 +51,11 @@ public interface BlockCache { * @param caching Whether this request has caching enabled (used for stats) * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) + * @param updateCacheMetrics Whether to update cache metrics or not * @return Block or null if block is not in 2 cache. - * @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType, DataBlockEncoding) */ - Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat); + Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics); /** * Evict block from cache. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 796a29064b9..741c9e28981 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -146,6 +146,13 @@ public class CacheConfig { public static final String SLAB_CACHE_OFFHEAP_PERCENTAGE_KEY = "hbase.offheapcache.percentage"; + /** + * Configuration key to prefetch all blocks of a given file into the block cache + * when the file is opened. + */ + public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = + "hbase.rs.prefetchblocksonopen"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -155,6 +162,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; /** Local reference to the block cache, null if completely disabled */ private final BlockCache blockCache; @@ -185,6 +193,9 @@ public class CacheConfig { /** Whether data blocks should be stored in compressed form in the cache */ private final boolean cacheCompressed; + /** Whether data blocks should be prefetched into the cache */ + private final boolean prefetchOnOpen; + /** * Create a cache configuration using the specified configuration object and * family descriptor. @@ -205,7 +216,9 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE) + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, + DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen() ); } @@ -226,7 +239,8 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE) + DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ); } @@ -242,12 +256,13 @@ public class CacheConfig { * @param cacheBloomsOnWrite whether blooms should be cached on write * @param evictOnClose whether blocks should be evicted when HFile is closed * @param cacheCompressed whether to store blocks as compressed in the cache + * @param prefetchOnOpen whether to prefetch blocks upon open */ CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed) { + final boolean cacheCompressed, final boolean prefetchOnOpen) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; @@ -256,6 +271,7 @@ public class CacheConfig { this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; this.cacheCompressed = cacheCompressed; + this.prefetchOnOpen = prefetchOnOpen; LOG.info(this); } @@ -267,7 +283,7 @@ public class CacheConfig { this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed); + cacheConf.cacheCompressed, cacheConf.prefetchOnOpen); } /** @@ -303,7 +319,10 @@ public class CacheConfig { boolean shouldCache = isBlockCacheEnabled() && (cacheDataOnRead || category == BlockCategory.INDEX || - category == BlockCategory.BLOOM); + category == BlockCategory.BLOOM || + (prefetchOnOpen && + (category != BlockCategory.META && + category != BlockCategory.UNKNOWN))); return shouldCache; } @@ -371,6 +390,13 @@ public class CacheConfig { return isBlockCacheEnabled() && this.cacheCompressed; } + /** + * @return true if blocks should be prefetched into the cache on open, false if not + */ + public boolean shouldPrefetchOnOpen() { + return isBlockCacheEnabled() && this.prefetchOnOpen; + } + @Override public String toString() { if (!isBlockCacheEnabled()) { @@ -382,7 +408,8 @@ public class CacheConfig { ", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + ", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + ", cacheEvictOnClose=" + shouldEvictOnClose() + - ", cacheCompressed=" + shouldCacheCompressed(); + ", cacheCompressed=" + shouldCacheCompressed() + + ", prefetchOnOpen=" + shouldPrefetchOnOpen(); } // Static block cache reference and methods @@ -476,4 +503,4 @@ public class CacheConfig { } return GLOBAL_BLOCK_CACHE_INSTANCE; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 6f23d1648cf..eb03fbd9add 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; * CombinedBlockCache is an abstraction layer that combines * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used * to cache bloom blocks and index blocks. The larger bucketCache is used to - * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean)}, boolean, boolean) reads + * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads * first from the smaller lruCache before looking for the block in the bucketCache. Blocks evicted * from lruCache are put into the bucket cache. * Metrics are the combined size and hits and misses of both caches. @@ -72,13 +72,13 @@ public class CombinedBlockCache implements BlockCache, HeapSize { @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, - boolean repeat) { + boolean repeat, boolean updateCacheMetrics) { // TODO: is there a hole here, or just awkwardness since in the lruCache getBlock // we end up calling bucketCache.getBlock. if (lruCache.containsBlock(cacheKey)) { - return lruCache.getBlock(cacheKey, caching, repeat); + return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } - return bucketCache.getBlock(cacheKey, caching, repeat); + return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java index 274f847d4b3..da7a56e900f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java @@ -91,22 +91,25 @@ public class DoubleBlockCache implements ResizableBlockCache, HeapSize { } @Override - public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics) { Cacheable cachedBlock; - if ((cachedBlock = onHeapCache.getBlock(cacheKey, caching, repeat)) != null) { - stats.hit(caching); + if ((cachedBlock = onHeapCache.getBlock(cacheKey, caching, repeat, + updateCacheMetrics)) != null) { + if (updateCacheMetrics) stats.hit(caching); return cachedBlock; - } else if ((cachedBlock = offHeapCache.getBlock(cacheKey, caching, repeat)) != null) { + } else if ((cachedBlock = offHeapCache.getBlock(cacheKey, caching, repeat, + updateCacheMetrics)) != null) { if (caching) { onHeapCache.cacheBlock(cacheKey, cachedBlock); } - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); return cachedBlock; } - if (!repeat) stats.miss(caching); + if (!repeat && updateCacheMetrics) stats.miss(caching); return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 2b2f9d08e7a..aa204672d02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -459,7 +459,8 @@ public class HFile { */ HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread, final boolean isCompaction, - BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) + final boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 054a2841ede..f7b5b9d768b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -258,7 +258,7 @@ public class HFileBlockIndex { expectedBlockType = BlockType.DATA; } block = cachingBlockReader.readBlock(currentOffset, - currentOnDiskSize, shouldCache, pread, isCompaction, + currentOnDiskSize, shouldCache, pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding); } @@ -337,7 +337,7 @@ public class HFileBlockIndex { // Caching, using pread, assuming this is not a compaction. HFileBlock midLeafBlock = cachingBlockReader.readBlock( - midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, + midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX, null); ByteBuffer b = midLeafBlock.getBufferWithoutHeader(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 3fbebb39f76..209815af9cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -46,6 +46,8 @@ import org.apache.hadoop.io.WritableUtils; import org.htrace.Trace; import org.htrace.TraceScope; +import com.google.common.annotations.VisibleForTesting; + /** * {@link HFile} reader for version 2. */ @@ -116,7 +118,7 @@ public class HFileReaderV2 extends AbstractHFileReader { * @param hfs * @param conf */ - public HFileReaderV2(Path path, FixedFileTrailer trailer, + public HFileReaderV2(final Path path, final FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf, final HFileSystem hfs, final Configuration conf) throws IOException { super(path, trailer, size, cacheConf, hfs, conf); @@ -177,6 +179,42 @@ public class HFileReaderV2 extends AbstractHFileReader { while ((b = blockIter.nextBlock()) != null) { loadOnOpenBlocks.add(b); } + + // Prefetch file blocks upon open if requested + if (cacheConf.shouldPrefetchOnOpen()) { + PrefetchExecutor.request(path, new Runnable() { + public void run() { + try { + long offset = 0; + long end = fileSize - getTrailer().getTrailerSize(); + HFileBlock prevBlock = null; + while (offset < end) { + if (Thread.interrupted()) { + break; + } + long onDiskSize = -1; + if (prevBlock != null) { + onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader(); + } + HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, + null, null); + prevBlock = block; + offset += block.getOnDiskSizeWithHeader(); + } + } catch (IOException e) { + // IOExceptions are probably due to region closes (relocation, etc.) + if (LOG.isTraceEnabled()) { + LOG.trace("Exception encountered while prefetching " + path + ":", e); + } + } catch (Exception e) { + // Other exceptions are interesting + LOG.warn("Exception encountered while prefetching " + path + ":", e); + } finally { + PrefetchExecutor.complete(path); + } + } + }); + } } protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize, @@ -212,13 +250,13 @@ public class HFileReaderV2 extends AbstractHFileReader { } private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, - boolean isCompaction, BlockType expectedBlockType, + boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { // Check cache for block. If found return. if (cacheConf.isBlockCacheEnabled()) { BlockCache cache = cacheConf.getBlockCache(); - HFileBlock cachedBlock = - (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock); + HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, + updateCacheMetrics); if (cachedBlock != null) { validateBlockType(cachedBlock, expectedBlockType); @@ -297,7 +335,7 @@ public class HFileReaderV2 extends AbstractHFileReader { cacheBlock &= cacheConf.shouldCacheDataOnRead(); if (cacheConf.isBlockCacheEnabled()) { - HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, false, + HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null); if (cachedBlock != null) { // Return a distinct 'shallow copy' of the block, @@ -348,7 +386,8 @@ public class HFileReaderV2 extends AbstractHFileReader { @Override public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, boolean pread, final boolean isCompaction, - BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) + boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) throws IOException { if (dataBlockIndexReader == null) { throw new IOException("Block index not loaded"); @@ -382,12 +421,13 @@ public class HFileReaderV2 extends AbstractHFileReader { // Try and get the block from the block cache. If the useLock variable is true then this // is the second time through the loop and it should not be counted as a block cache miss. HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, - expectedBlockType, expectedDataBlockEncoding); + updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { validateBlockType(cachedBlock, expectedBlockType); if (cachedBlock.getBlockType().isData()) { - HFile.dataBlockReadCnt.incrementAndGet(); - + if (updateCacheMetrics) { + HFile.dataBlockReadCnt.incrementAndGet(); + } // Validate encoding type for data blocks. We include encoding // type in the cache key, and we expect it to match on a cache hit. if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { @@ -422,7 +462,7 @@ public class HFileReaderV2 extends AbstractHFileReader { cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); } - if (hfileBlock.getBlockType().isData()) { + if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } @@ -493,6 +533,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } public void close(boolean evictOnClose) throws IOException { + PrefetchExecutor.cancel(path); if (evictOnClose && cacheConf.isBlockCacheEnabled()) { int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name); if (LOG.isTraceEnabled()) { @@ -644,7 +685,7 @@ public class HFileReaderV2 extends AbstractHFileReader { // figure out the size. seekToBlock = reader.readBlock(previousBlockOffset, seekToBlock.getOffset() - previousBlockOffset, cacheBlocks, - pread, isCompaction, BlockType.DATA, getEffectiveDataBlockEncoding()); + pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); // TODO shortcut: seek forward in this block to the last key of the // block. } @@ -680,7 +721,7 @@ public class HFileReaderV2 extends AbstractHFileReader { curBlock = reader.readBlock(curBlock.getOffset() + curBlock.getOnDiskSizeWithHeader(), curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, - isCompaction, null, getEffectiveDataBlockEncoding()); + isCompaction, true, null, getEffectiveDataBlockEncoding()); } while (!curBlock.getBlockType().isData()); return curBlock; @@ -844,7 +885,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, BlockType.DATA, getEffectiveDataBlockEncoding()); + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); if (block.getOffset() < 0) { throw new IOException("Invalid block offset: " + block.getOffset()); } @@ -1139,7 +1180,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, BlockType.DATA, getEffectiveDataBlockEncoding()); + isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); if (block.getOffset() < 0) { throw new IOException("Invalid block offset: " + block.getOffset()); } @@ -1286,4 +1327,13 @@ public class HFileReaderV2 extends AbstractHFileReader { public HFileContext getFileContext() { return hfileContext; } + + /** + * Returns false if block prefetching was requested for this file and has + * not completed, true otherwise + */ + @VisibleForTesting + boolean prefetchComplete() { + return PrefetchExecutor.isCompleted(path); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java index 8c7d4284e8c..c110348fa26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java @@ -69,7 +69,7 @@ public class HFileReaderV3 extends HFileReaderV2 { * @param conf * Configuration */ - public HFileReaderV3(Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis, + public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf, final HFileSystem hfs, final Configuration conf) throws IOException { super(path, trailer, fsdis, size, cacheConf, hfs, conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 3456a80f1ed..dead1735f1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -377,19 +377,21 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking) + * @param updateCacheMetrics Whether to update cache metrics or not * @return buffer of specified cache key, or null if not in cache - * @see HFileReaderV2#readBlock(long, long, boolean, boolean, boolean, BlockType, DataBlockEncoding) */ @Override - public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics) { CachedBlock cb = map.get(cacheKey); if (cb == null) { - if (!repeat) stats.miss(caching); - if (victimHandler != null) - return victimHandler.getBlock(cacheKey, caching, repeat); + if (!repeat && updateCacheMetrics) stats.miss(caching); + if (victimHandler != null) { + return victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + } return null; } - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); cb.access(count.incrementAndGet()); return cb.getBuffer(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java new file mode 100644 index 00000000000..f03344338a5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -0,0 +1,122 @@ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; + +public class PrefetchExecutor { + + private static final Log LOG = LogFactory.getLog(PrefetchExecutor.class); + + /** Futures for tracking block prefetch activity */ + private static final Map> prefetchFutures = + new ConcurrentSkipListMap>(); + /** Executor pool shared among all HFiles for block prefetch */ + private static final ScheduledExecutorService prefetchExecutorPool; + /** Delay before beginning prefetch */ + private static final int prefetchDelayMillis; + /** Variation in prefetch delay times, to mitigate stampedes */ + private static final float prefetchDelayVariation; + static { + // Consider doing this on demand with a configuration passed in rather + // than in a static initializer. + Configuration conf = HBaseConfiguration.create(); + // 1s here for tests, consider 30s in hbase-default.xml + // Set to 0 for no delay + prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000); + prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f); + int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4); + prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("hfile-prefetch-" + System.currentTimeMillis()); + t.setDaemon(true); + return t; + } + }); + } + + private static final Random RNG = new Random(); + + // TODO: We want HFile, which is where the blockcache lives, to handle + // prefetching of file blocks but the Store level is where path convention + // knowledge should be contained + private static final Pattern prefetchPathExclude = + Pattern.compile( + "(" + + Path.SEPARATOR_CHAR + + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.") + + Path.SEPARATOR_CHAR + + ")|(" + + Path.SEPARATOR_CHAR + + HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + + Path.SEPARATOR_CHAR + + ")"); + + public static void request(Path path, Runnable runnable) { + if (!prefetchPathExclude.matcher(path.toString()).find()) { + long delay; + if (prefetchDelayMillis > 0) { + delay = (long)((prefetchDelayMillis * (1.0f - (prefetchDelayVariation/2))) + + (prefetchDelayMillis * (prefetchDelayVariation/2) * RNG.nextFloat())); + } else { + delay = 0; + } + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); + } + prefetchFutures.put(path, prefetchExecutorPool.schedule(runnable, delay, + TimeUnit.MILLISECONDS)); + } catch (RejectedExecutionException e) { + prefetchFutures.remove(path); + LOG.warn("Prefetch request rejected for " + path); + } + } + } + + public static void complete(Path path) { + prefetchFutures.remove(path); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch completed for " + path); + } + } + + public static void cancel(Path path) { + Future future = prefetchFutures.get(path); + if (future != null) { + // ok to race with other cancellation attempts + future.cancel(true); + prefetchFutures.remove(path); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch cancelled for " + path); + } + } + } + + public static boolean isCompleted(Path path) { + Future future = prefetchFutures.get(path); + if (future != null) { + return future.isDone(); + } + return true; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index deacb198b22..36cd00bf5e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -347,15 +347,17 @@ public class BucketCache implements BlockCache, HeapSize { * @param key block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block + * @param updateCacheMetrics Whether we should update cache metrics or not * @return buffer of specified cache key, or null if not in cache */ @Override - public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { if (!cacheEnabled) return null; RAMQueueEntry re = ramCache.get(key); if (re != null) { - cacheStats.hit(caching); + if (updateCacheMetrics) cacheStats.hit(caching); re.access(accessCount.incrementAndGet()); return re.getData(); } @@ -375,8 +377,10 @@ public class BucketCache implements BlockCache, HeapSize { Cacheable cachedBlock = bucketEntry.deserializerReference( deserialiserMap).deserialize(bb, true); long timeTaken = System.nanoTime() - start; - cacheStats.hit(caching); - cacheStats.ioHit(timeTaken); + if (updateCacheMetrics) { + cacheStats.hit(caching); + cacheStats.ioHit(timeTaken); + } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; @@ -392,7 +396,7 @@ public class BucketCache implements BlockCache, HeapSize { } } } - if(!repeat) cacheStats.miss(caching); + if (!repeat && updateCacheMetrics) cacheStats.miss(caching); return null; } @@ -1192,4 +1196,4 @@ public class BucketCache implements BlockCache, HeapSize { writerThread.join(); } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java index 7e713d60cbc..381c5c95231 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java @@ -149,14 +149,15 @@ public class SingleSizeCache implements BlockCache, HeapSize { } @Override - public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { CacheablePair contentBlock = backingMap.get(key); if (contentBlock == null) { - if (!repeat) stats.miss(caching); + if (!repeat && updateCacheMetrics) stats.miss(caching); return null; } - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); // If lock cannot be obtained, that means we're undergoing eviction. try { contentBlock.recentlyAccessed.set(System.nanoTime()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 8397538c52e..561c6f47c7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -245,19 +245,20 @@ public class SlabCache implements SlabItemActionWatcher, BlockCache, HeapSize { * * @return buffer of specified block name, or null if not in cache */ - public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { SingleSizeCache cachedBlock = backingStore.get(key); if (cachedBlock == null) { if (!repeat) stats.miss(caching); return null; } - Cacheable contentBlock = cachedBlock.getBlock(key, caching, false); + Cacheable contentBlock = cachedBlock.getBlock(key, caching, false, updateCacheMetrics); if (contentBlock != null) { - stats.hit(caching); + if (updateCacheMetrics) stats.hit(caching); } else if (!repeat) { - stats.miss(caching); + if (updateCacheMetrics) stats.miss(caching); } return contentBlock; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java index 8d13fd116eb..8e87132539a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java @@ -98,7 +98,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase try { // We cache the block and use a positional read. bloomBlock = reader.readBlock(index.getRootBlockOffset(block), - index.getRootBlockDataSize(block), true, true, false, + index.getRootBlockDataSize(block), true, true, false, true, BlockType.BLOOM_CHUNK, null); } catch (IOException ex) { // The Bloom filter is broken, turn it off. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index cf115750496..5ef8cf0864d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -93,12 +93,12 @@ public class CacheTestUtils { } toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, - false, false); + false, false, true); if (retrievedBlock != null) { assertEquals(ourBlock.block, retrievedBlock); toBeTested.evictBlock(ourBlock.blockName); hits.incrementAndGet(); - assertNull(toBeTested.getBlock(ourBlock.blockName, false, false)); + assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); } else { miss.incrementAndGet(); } @@ -126,7 +126,7 @@ public class CacheTestUtils { HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize); // Confirm empty for (HFileBlockPair block : blocks) { - assertNull(toBeTested.getBlock(block.blockName, true, false)); + assertNull(toBeTested.getBlock(block.blockName, true, false, true)); } // Add blocks @@ -139,7 +139,7 @@ public class CacheTestUtils { // MapMaker makes no guarantees when it will evict, so neither can we. for (HFileBlockPair block : blocks) { - HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false); + HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); if (buf != null) { assertEquals(block.block, buf); } @@ -150,7 +150,7 @@ public class CacheTestUtils { for (HFileBlockPair block : blocks) { try { - if (toBeTested.getBlock(block.blockName, true, false) != null) { + if (toBeTested.getBlock(block.blockName, true, false, true) != null) { toBeTested.cacheBlock(block.blockName, block.block); if (!(toBeTested instanceof BucketCache)) { // BucketCache won't throw exception when caching already cached @@ -184,7 +184,7 @@ public class CacheTestUtils { @Override public void doAnAction() throws Exception { ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested - .getBlock(key, false, false); + .getBlock(key, false, false, true); assertArrayEquals(buf, returned.buf); totalQueries.incrementAndGet(); } @@ -223,7 +223,7 @@ public class CacheTestUtils { final ByteArrayCacheable bac = new ByteArrayCacheable(buf); ByteArrayCacheable gotBack = (ByteArrayCacheable) toBeTested - .getBlock(key, true, false); + .getBlock(key, true, false, true); if (gotBack != null) { assertArrayEquals(gotBack.buf, bac.buf); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 88b2ece0917..50a9b9f50ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -245,10 +245,10 @@ public class TestCacheOnWrite { // Flags: don't cache the block, use pread, this is not a compaction. // Also, pass null for expected block type to avoid checking it. HFileBlock block = reader.readBlock(offset, onDiskSize, false, true, - false, null, encodingInCache); + false, true, null, encodingInCache); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false) != null; + boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); if (shouldBeCached != isCached) { throw new AssertionError( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 8f50ef6b36d..f3e2f517bbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -166,7 +166,8 @@ public class TestHFileBlockIndex { @Override public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread, boolean isCompaction, - BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) + boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding) throws IOException { if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 0288e922d94..3f2c84b4a7f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -83,7 +83,7 @@ public class TestHFileDataBlockEncoder { BlockCacheKey cacheKey = new BlockCacheKey("test", 0); blockCache.cacheBlock(cacheKey, cacheBlock); - HeapSize heapSize = blockCache.getBlock(cacheKey, false, false); + HeapSize heapSize = blockCache.getBlock(cacheKey, false, false, true); assertTrue(heapSize instanceof HFileBlock); HFileBlock returnedBlock = (HFileBlock) heapSize;; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java index 2de760835ba..d0d27b38f61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java @@ -106,7 +106,7 @@ public class TestLruBlockCache { // Confirm empty for (CachedItem block : blocks) { - assertTrue(cache.getBlock(block.cacheKey, true, false) == null); + assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null); } // Add blocks @@ -120,7 +120,7 @@ public class TestLruBlockCache { // Check if all blocks are properly cached and retrieved for (CachedItem block : blocks) { - HeapSize buf = cache.getBlock(block.cacheKey, true, false); + HeapSize buf = cache.getBlock(block.cacheKey, true, false, true); assertTrue(buf != null); assertEquals(buf.heapSize(), block.heapSize()); } @@ -139,7 +139,7 @@ public class TestLruBlockCache { // Check if all blocks are properly cached and retrieved for (CachedItem block : blocks) { - HeapSize buf = cache.getBlock(block.cacheKey, true, false); + HeapSize buf = cache.getBlock(block.cacheKey, true, false, true); assertTrue(buf != null); assertEquals(buf.heapSize(), block.heapSize()); } @@ -184,9 +184,9 @@ public class TestLruBlockCache { (maxSize * LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); // All blocks except block 0 should be in the cache - assertTrue(cache.getBlock(blocks[0].cacheKey, true, false) == null); + assertTrue(cache.getBlock(blocks[0].cacheKey, true, false, true) == null); for(int i=1;i